Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make end_all_traces be called at the correct moment #2516

Merged
merged 8 commits into from
Jul 4, 2024
7 changes: 3 additions & 4 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ async def retrieve_vertices_order(
# which duplicates the results
for vertex_id in first_layer:
graph.remove_from_predecessors(vertex_id)
graph.remove_vertex_from_runnables(vertex_id)

# Now vertices is a list of lists
# We need to get the id of each vertex
Expand Down Expand Up @@ -160,7 +161,7 @@ async def build_vertex(
Args:
flow_id (str): The ID of the flow.
vertex_id (str): The ID of the vertex to build.
background_tasks (BackgroundTasks): The background tasks object for logging.
background_tasks (BackgroundTasks): The background tasks dependency.
inputs (Optional[InputValueRequest], optional): The input values for the vertex. Defaults to None.
chat_service (ChatService, optional): The chat service dependency. Defaults to Depends(get_chat_service).
current_user (Any, optional): The current user dependency. Defaults to Depends(get_current_active_user).
Expand Down Expand Up @@ -211,8 +212,6 @@ async def build_vertex(
)
top_level_vertices = graph.run_manager.get_top_level_vertices(graph, next_runnable_vertices)

result_data_response = ResultDataResponse(**result_dict.model_dump())

result_data_response = ResultDataResponse.model_validate(result_dict, from_attributes=True)
except Exception as exc:
if isinstance(exc, ComponentBuildException):
Expand Down Expand Up @@ -265,7 +264,7 @@ async def build_vertex(
if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices:
next_runnable_vertices = [graph.stop_vertex]

if not next_runnable_vertices:
if not graph.run_manager.vertices_to_run and not next_runnable_vertices:
background_tasks.add_task(graph.end_all_traces)

build_response = VertexBuildResponse(
Expand Down
3 changes: 3 additions & 0 deletions src/backend/base/langflow/graph/graph/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1469,6 +1469,9 @@ def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str]
def remove_from_predecessors(self, vertex_id: str):
self.run_manager.remove_from_predecessors(vertex_id)

def remove_vertex_from_runnables(self, vertex_id: str):
self.run_manager.remove_vertex_from_runnables(vertex_id)

def build_in_degree(self, edges: List[ContractEdge]) -> Dict[str, int]:
in_degree: Dict[str, int] = defaultdict(int)
for edge in edges:
Expand Down
15 changes: 11 additions & 4 deletions src/backend/base/langflow/services/telemetry/service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextlib
import os
import platform
from datetime import datetime, timezone
Expand Down Expand Up @@ -30,7 +31,7 @@ def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service
self.base_url = settings_service.settings.telemetry_base_url
self.telemetry_queue: asyncio.Queue = asyncio.Queue()
self.client = httpx.AsyncClient(timeout=None)
self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout
self.running = False
self.package = get_version_info()["package"]

Expand Down Expand Up @@ -63,8 +64,12 @@ async def send_telemetry_data(self, payload: BaseModel, path: str | None = None)
logger.error(f"Failed to send telemetry data: {response.status_code} {response.text}")
else:
logger.debug("Telemetry data sent successfully.")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error occurred: {e}")
except httpx.RequestError as e:
logger.error(f"Request error occurred: {e}")
except Exception as e:
logger.error(f"Failed to send telemetry data due to: {e}")
logger.error(f"Unexpected error occurred: {e}")

async def log_package_run(self, payload: RunPayload):
await self.telemetry_queue.put((self.send_telemetry_data, payload, "run"))
Expand Down Expand Up @@ -119,8 +124,10 @@ async def stop(self):
try:
self.running = False
await self.flush()
self.worker_task.cancel()
if self.worker_task:
await self.worker_task
self.worker_task.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good finding. I saw the error message multiple times

with contextlib.suppress(asyncio.CancelledError):
await self.worker_task
await self.client.aclose()
except Exception as e:
logger.error(f"Error stopping tracing service: {e}")
2 changes: 0 additions & 2 deletions src/backend/base/langflow/services/tracing/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import UUID

from langchain.callbacks.tracers.langchain import wait_for_all_tracers
from loguru import logger

from langflow.schema.data import Data
Expand Down Expand Up @@ -292,5 +291,4 @@ def end(
self._run_tree.add_metadata(metadata)
self._run_tree.end(outputs=outputs, error=error)
self._run_tree.post()
wait_for_all_tracers()
self._run_link = self._run_tree.get_url()