diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index e193425684c..ecd85748ed9 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -111,6 +111,7 @@ async def retrieve_vertices_order( # which duplicates the results for vertex_id in first_layer: graph.remove_from_predecessors(vertex_id) + graph.remove_vertex_from_runnables(vertex_id) # Now vertices is a list of lists # We need to get the id of each vertex @@ -160,7 +161,7 @@ async def build_vertex( Args: flow_id (str): The ID of the flow. vertex_id (str): The ID of the vertex to build. - background_tasks (BackgroundTasks): The background tasks object for logging. + background_tasks (BackgroundTasks): The background tasks dependency. inputs (Optional[InputValueRequest], optional): The input values for the vertex. Defaults to None. chat_service (ChatService, optional): The chat service dependency. Defaults to Depends(get_chat_service). current_user (Any, optional): The current user dependency. Defaults to Depends(get_current_active_user). @@ -211,8 +212,6 @@ async def build_vertex( ) top_level_vertices = graph.run_manager.get_top_level_vertices(graph, next_runnable_vertices) - result_data_response = ResultDataResponse(**result_dict.model_dump()) - result_data_response = ResultDataResponse.model_validate(result_dict, from_attributes=True) except Exception as exc: if isinstance(exc, ComponentBuildException): @@ -265,7 +264,7 @@ async def build_vertex( if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices: next_runnable_vertices = [graph.stop_vertex] - if not next_runnable_vertices: + if not graph.run_manager.vertices_to_run and not next_runnable_vertices: background_tasks.add_task(graph.end_all_traces) build_response = VertexBuildResponse( diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index a0f84292e45..a83a971b55a 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1469,6 +1469,9 @@ def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str] def remove_from_predecessors(self, vertex_id: str): self.run_manager.remove_from_predecessors(vertex_id) + def remove_vertex_from_runnables(self, vertex_id: str): + self.run_manager.remove_vertex_from_runnables(vertex_id) + def build_in_degree(self, edges: List[ContractEdge]) -> Dict[str, int]: in_degree: Dict[str, int] = defaultdict(int) for edge in edges: diff --git a/src/backend/base/langflow/services/telemetry/service.py b/src/backend/base/langflow/services/telemetry/service.py index 76e82835c38..9bed7f5536b 100644 --- a/src/backend/base/langflow/services/telemetry/service.py +++ b/src/backend/base/langflow/services/telemetry/service.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import os import platform from datetime import datetime, timezone @@ -30,7 +31,7 @@ def __init__(self, settings_service: "SettingsService"): self.settings_service = settings_service self.base_url = settings_service.settings.telemetry_base_url self.telemetry_queue: asyncio.Queue = asyncio.Queue() - self.client = httpx.AsyncClient(timeout=None) + self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout self.running = False self.package = get_version_info()["package"] @@ -63,8 +64,12 @@ async def send_telemetry_data(self, payload: BaseModel, path: str | None = None) logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}") else: logger.debug("Telemetry data sent successfully.") + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error occurred: {e}") + except httpx.RequestError as e: + logger.error(f"Request error occurred: {e}") except Exception as e: - logger.error(f"Failed to send telemetry data due to: {e}") + logger.error(f"Unexpected error occurred: {e}") async def log_package_run(self, payload: RunPayload): await self.telemetry_queue.put((self.send_telemetry_data, payload, "run")) @@ -119,8 +124,10 @@ async def stop(self): try: self.running = False await self.flush() - self.worker_task.cancel() if self.worker_task: - await self.worker_task + self.worker_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self.worker_task + await self.client.aclose() except Exception as e: logger.error(f"Error stopping tracing service: {e}") diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index 749a4b91042..d3307e8b31e 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -7,7 +7,6 @@ from typing import TYPE_CHECKING, Any, Dict, Optional from uuid import UUID -from langchain.callbacks.tracers.langchain import wait_for_all_tracers from loguru import logger from langflow.schema.data import Data @@ -292,5 +291,4 @@ def end( self._run_tree.add_metadata(metadata) self._run_tree.end(outputs=outputs, error=error) self._run_tree.post() - wait_for_all_tracers() self._run_link = self._run_tree.get_url()