From 6e006354c3b7c2632263dbfc3efb19634254f353 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 15 Jul 2024 16:16:23 -0300 Subject: [PATCH] fix: update telemetry schema and fix telemtry calls (#2708) --- src/backend/base/langflow/api/v1/chat.py | 12 +++++++----- .../base/langflow/services/telemetry/schema.py | 1 + .../base/langflow/services/telemetry/service.py | 6 +++--- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index fa3629f2bbc..e494288c5b7 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -173,6 +173,7 @@ async def build_vertex( next_runnable_vertices = [] top_level_vertices = [] start_time = time.perf_counter() + error_message = None try: cache = await chat_service.get_cache(flow_id_str) if not cache: @@ -215,6 +216,7 @@ async def build_vertex( params = format_exception_message(exc) message = {"errorMessage": params, "stackTrace": tb} valid = False + error_message = params output_label = vertex.outputs[0]["name"] if vertex.outputs else "output" outputs = {output_label: OutputLog(message=message, type="error")} result_data_response = ResultDataResponse(results={}, outputs=outputs) @@ -231,7 +233,7 @@ async def build_vertex( background_tasks.add_task( log_vertex_build, flow_id=flow_id_str, - vertex_id=vertex_id, + vertex_id=vertex_id.split("-")[0], valid=valid, params=params, data=result_data_response, @@ -256,7 +258,7 @@ async def build_vertex( if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices: next_runnable_vertices = [graph.stop_vertex] - if graph.run_manager.all_predecessors_are_fulfilled() and not next_runnable_vertices: + if not graph.run_manager.vertices_to_run and not next_runnable_vertices: background_tasks.add_task(graph.end_all_traces) build_response = VertexBuildResponse( @@ -271,10 +273,10 @@ async def build_vertex( background_tasks.add_task( telemetry_service.log_package_component, ComponentPayload( - componentName=vertex_id, + componentName=vertex_id.split("-")[0], componentSeconds=int(time.perf_counter() - start_time), componentSuccess=valid, - componentErrorMessage=params, + componentErrorMessage=error_message, ), ) return build_response @@ -282,7 +284,7 @@ async def build_vertex( background_tasks.add_task( telemetry_service.log_package_component, ComponentPayload( - componentName=vertex_id, + componentName=vertex_id.split("-")[0], componentSeconds=int(time.perf_counter() - start_time), componentSuccess=False, componentErrorMessage=str(exc), diff --git a/src/backend/base/langflow/services/telemetry/schema.py b/src/backend/base/langflow/services/telemetry/schema.py index 9d62e128a18..d4b80f3f2aa 100644 --- a/src/backend/base/langflow/services/telemetry/schema.py +++ b/src/backend/base/langflow/services/telemetry/schema.py @@ -13,6 +13,7 @@ class ShutdownPayload(BaseModel): class VersionPayload(BaseModel): + package: str version: str platform: str python: str diff --git a/src/backend/base/langflow/services/telemetry/service.py b/src/backend/base/langflow/services/telemetry/service.py index b91c3c9b421..53a46838abd 100644 --- a/src/backend/base/langflow/services/telemetry/service.py +++ b/src/backend/base/langflow/services/telemetry/service.py @@ -8,9 +8,9 @@ import httpx from loguru import logger from pydantic import BaseModel -from langflow.services.telemetry.opentelemetry import OpenTelemetry from langflow.services.base import Service +from langflow.services.telemetry.opentelemetry import OpenTelemetry from langflow.services.telemetry.schema import ( ComponentPayload, PlaygroundPayload, @@ -34,7 +34,6 @@ def __init__(self, settings_service: "SettingsService"): self.telemetry_queue: asyncio.Queue = asyncio.Queue() self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout self.running = False - self.package = get_version_info()["package"] self.ot = OpenTelemetry(prometheus_enabled=settings_service.settings.prometheus_enabled) @@ -58,7 +57,7 @@ async def send_telemetry_data(self, payload: BaseModel, path: str | None = None) logger.debug("Telemetry tracking is disabled.") return - url = f"{self.base_url}/{self.package.lower()}" + url = f"{self.base_url}" if path: url = f"{url}/{path}" try: @@ -86,6 +85,7 @@ async def log_package_version(self): version_info = get_version_info() architecture = platform.architecture()[0] payload = VersionPayload( + package=version_info["package"].lower(), version=version_info["version"], platform=platform.platform(), python=python_version,