Skip to content

Commit

Permalink
fix: make end_all_traces be called at the correct moment (langflow-ai…
Browse files Browse the repository at this point in the history
…#2516)

* fix(tracing/service.py): remove wait_for_all_tracers call

* refactor: set reasonable timeout for TelemetryService client

* fix: handle HTTP and request errors in TelemetryService

Handle HTTPStatusError and RequestError exceptions separately in the send_telemetry_data method of TelemetryService to provide more specific error messages. Also, catch any unexpected exceptions and log them with an appropriate error message.

* fix: cancel worker task and close client in TelemetryService stop method

Cancel the worker task and close the client in the stop method of TelemetryService to ensure proper cleanup and prevent potential resource leaks. Also handle any exceptions that may occur during the cleanup process and log appropriate error messages.

* style(telemetry/service.py): fix indentation issue in await statement to comply with PEP8 guidelines

* feat(graph): add method to remove vertex from runnables in Graph class

* fix(chat.py): fix issue where vertex was not being removed from runnables list to prevent duplication of results

* fix(chat.py): defines when the end_all_traces call should happen

(cherry picked from commit 15aa68a)
  • Loading branch information
ogabrielluiz authored and nicoloboschi committed Jul 10, 2024
1 parent a120094 commit 9b86540
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 10 deletions.
7 changes: 3 additions & 4 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ async def retrieve_vertices_order(
# which duplicates the results
for vertex_id in first_layer:
graph.remove_from_predecessors(vertex_id)
graph.remove_vertex_from_runnables(vertex_id)

# Now vertices is a list of lists
# We need to get the id of each vertex
Expand Down Expand Up @@ -160,7 +161,7 @@ async def build_vertex(
Args:
flow_id (str): The ID of the flow.
vertex_id (str): The ID of the vertex to build.
background_tasks (BackgroundTasks): The background tasks object for logging.
background_tasks (BackgroundTasks): The background tasks dependency.
inputs (Optional[InputValueRequest], optional): The input values for the vertex. Defaults to None.
chat_service (ChatService, optional): The chat service dependency. Defaults to Depends(get_chat_service).
current_user (Any, optional): The current user dependency. Defaults to Depends(get_current_active_user).
Expand Down Expand Up @@ -211,8 +212,6 @@ async def build_vertex(
)
top_level_vertices = graph.run_manager.get_top_level_vertices(graph, next_runnable_vertices)

result_data_response = ResultDataResponse(**result_dict.model_dump())

result_data_response = ResultDataResponse.model_validate(result_dict, from_attributes=True)
except Exception as exc:
if isinstance(exc, ComponentBuildException):
Expand Down Expand Up @@ -265,7 +264,7 @@ async def build_vertex(
if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices:
next_runnable_vertices = [graph.stop_vertex]

if not next_runnable_vertices:
if not graph.run_manager.vertices_to_run and not next_runnable_vertices:
background_tasks.add_task(graph.end_all_traces)

build_response = VertexBuildResponse(
Expand Down
3 changes: 3 additions & 0 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1469,6 +1469,9 @@ def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str]
def remove_from_predecessors(self, vertex_id: str):
self.run_manager.remove_from_predecessors(vertex_id)

def remove_vertex_from_runnables(self, vertex_id: str):
self.run_manager.remove_vertex_from_runnables(vertex_id)

def build_in_degree(self, edges: List[ContractEdge]) -> Dict[str, int]:
in_degree: Dict[str, int] = defaultdict(int)
for edge in edges:
Expand Down
15 changes: 11 additions & 4 deletions src/backend/base/langflow/services/telemetry/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import os
import platform
from datetime import datetime, timezone
Expand Down Expand Up @@ -30,7 +31,7 @@ def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service
self.base_url = settings_service.settings.telemetry_base_url
self.telemetry_queue: asyncio.Queue = asyncio.Queue()
self.client = httpx.AsyncClient(timeout=None)
self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout
self.running = False
self.package = get_version_info()["package"]

Expand Down Expand Up @@ -63,8 +64,12 @@ async def send_telemetry_data(self, payload: BaseModel, path: str | None = None)
logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}")
else:
logger.debug("Telemetry data sent successfully.")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
except httpx.RequestError as e:
logger.error(f"Request error occurred: {e}")
except Exception as e:
logger.error(f"Failed to send telemetry data due to: {e}")
logger.error(f"Unexpected error occurred: {e}")

async def log_package_run(self, payload: RunPayload):
await self.telemetry_queue.put((self.send_telemetry_data, payload, "run"))
Expand Down Expand Up @@ -119,8 +124,10 @@ async def stop(self):
try:
self.running = False
await self.flush()
self.worker_task.cancel()
if self.worker_task:
await self.worker_task
self.worker_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.worker_task
await self.client.aclose()
except Exception as e:
logger.error(f"Error stopping tracing service: {e}")
2 changes: 0 additions & 2 deletions src/backend/base/langflow/services/tracing/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import UUID

from langchain.callbacks.tracers.langchain import wait_for_all_tracers
from loguru import logger

from langflow.schema.data import Data
Expand Down Expand Up @@ -292,5 +291,4 @@ def end(
self._run_tree.add_metadata(metadata)
self._run_tree.end(outputs=outputs, error=error)
self._run_tree.post()
wait_for_all_tracers()
self._run_link = self._run_tree.get_url()

0 comments on commit 9b86540

Please sign in to comment.