From 0e2b19de25b280bfd45fa904b1911c3b355b7ef5 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 4 Jul 2024 09:24:45 -0300 Subject: [PATCH] fix: make end_all_traces be called at the correct moment (#2516) * fix(tracing/service.py): remove wait_for_all_tracers call * refactor: set reasonable timeout for TelemetryService client * fix: handle HTTP and request errors in TelemetryService Handle HTTPStatusError and RequestError exceptions separately in the send_telemetry_data method of TelemetryService to provide more specific error messages. Also, catch any unexpected exceptions and log them with an appropriate error message. * fix: cancel worker task and close client in TelemetryService stop method Cancel the worker task and close the client in the stop method of TelemetryService to ensure proper cleanup and prevent potential resource leaks. Also handle any exceptions that may occur during the cleanup process and log appropriate error messages. * style(telemetry/service.py): fix indentation issue in await statement to comply with PEP8 guidelines * feat(graph): add method to remove vertex from runnables in Graph class * fix(chat.py): fix issue where vertex was not being removed from runnables list to prevent duplication of results * fix(chat.py): defines when the end_all_traces call should happen --- src/backend/base/langflow/api/v1/chat.py | 7 +++---- src/backend/base/langflow/graph/graph/base.py | 3 +++ .../base/langflow/services/telemetry/service.py | 15 +++++++++++---- .../base/langflow/services/tracing/service.py | 2 -- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index e193425684c3..ecd85748ed9d 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -111,6 +111,7 @@ async def retrieve_vertices_order( # which duplicates the results for vertex_id in first_layer: graph.remove_from_predecessors(vertex_id) + graph.remove_vertex_from_runnables(vertex_id) # Now vertices is a list of lists # We need to get the id of each vertex @@ -160,7 +161,7 @@ async def build_vertex( Args: flow_id (str): The ID of the flow. vertex_id (str): The ID of the vertex to build. - background_tasks (BackgroundTasks): The background tasks object for logging. + background_tasks (BackgroundTasks): The background tasks dependency. inputs (Optional[InputValueRequest], optional): The input values for the vertex. Defaults to None. chat_service (ChatService, optional): The chat service dependency. Defaults to Depends(get_chat_service). current_user (Any, optional): The current user dependency. Defaults to Depends(get_current_active_user). @@ -211,8 +212,6 @@ async def build_vertex( ) top_level_vertices = graph.run_manager.get_top_level_vertices(graph, next_runnable_vertices) - result_data_response = ResultDataResponse(**result_dict.model_dump()) - result_data_response = ResultDataResponse.model_validate(result_dict, from_attributes=True) except Exception as exc: if isinstance(exc, ComponentBuildException): @@ -265,7 +264,7 @@ async def build_vertex( if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices: next_runnable_vertices = [graph.stop_vertex] - if not next_runnable_vertices: + if not graph.run_manager.vertices_to_run and not next_runnable_vertices: background_tasks.add_task(graph.end_all_traces) build_response = VertexBuildResponse( diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index a0f84292e45b..a83a971b55ab 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1469,6 +1469,9 @@ def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str] def remove_from_predecessors(self, vertex_id: str): self.run_manager.remove_from_predecessors(vertex_id) + def remove_vertex_from_runnables(self, vertex_id: str): + self.run_manager.remove_vertex_from_runnables(vertex_id) + def build_in_degree(self, edges: List[ContractEdge]) -> Dict[str, int]: in_degree: Dict[str, int] = defaultdict(int) for edge in edges: diff --git a/src/backend/base/langflow/services/telemetry/service.py b/src/backend/base/langflow/services/telemetry/service.py index 76e82835c38e..9bed7f5536bf 100644 --- a/src/backend/base/langflow/services/telemetry/service.py +++ b/src/backend/base/langflow/services/telemetry/service.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import os import platform from datetime import datetime, timezone @@ -30,7 +31,7 @@ def __init__(self, settings_service: "SettingsService"): self.settings_service = settings_service self.base_url = settings_service.settings.telemetry_base_url self.telemetry_queue: asyncio.Queue = asyncio.Queue() - self.client = httpx.AsyncClient(timeout=None) + self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout self.running = False self.package = get_version_info()["package"] @@ -63,8 +64,12 @@ async def send_telemetry_data(self, payload: BaseModel, path: str | None = None) logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}") else: logger.debug("Telemetry data sent successfully.") + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error occurred: {e}") + except httpx.RequestError as e: + logger.error(f"Request error occurred: {e}") except Exception as e: - logger.error(f"Failed to send telemetry data due to: {e}") + logger.error(f"Unexpected error occurred: {e}") async def log_package_run(self, payload: RunPayload): await self.telemetry_queue.put((self.send_telemetry_data, payload, "run")) @@ -119,8 +124,10 @@ async def stop(self): try: self.running = False await self.flush() - self.worker_task.cancel() if self.worker_task: - await self.worker_task + self.worker_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self.worker_task + await self.client.aclose() except Exception as e: logger.error(f"Error stopping tracing service: {e}") diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index 749a4b910423..d3307e8b31eb 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -7,7 +7,6 @@ from typing import TYPE_CHECKING, Any, Dict, Optional from uuid import UUID -from langchain.callbacks.tracers.langchain import wait_for_all_tracers from loguru import logger from langflow.schema.data import Data @@ -292,5 +291,4 @@ def end( self._run_tree.add_metadata(metadata) self._run_tree.end(outputs=outputs, error=error) self._run_tree.post() - wait_for_all_tracers() self._run_link = self._run_tree.get_url()