Skip to content

Commit

Permalink
fix: update telemetry schema and fix telemtry calls (#2708)
Browse files Browse the repository at this point in the history
  • Loading branch information
ogabrielluiz authored Jul 15, 2024
1 parent 4268ee4 commit 6e00635
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
12 changes: 7 additions & 5 deletions src/backend/base/langflow/api/v1/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ async def build_vertex(
next_runnable_vertices = []
top_level_vertices = []
start_time = time.perf_counter()
error_message = None
try:
cache = await chat_service.get_cache(flow_id_str)
if not cache:
Expand Down Expand Up @@ -215,6 +216,7 @@ async def build_vertex(
params = format_exception_message(exc)
message = {"errorMessage": params, "stackTrace": tb}
valid = False
error_message = params
output_label = vertex.outputs[0]["name"] if vertex.outputs else "output"
outputs = {output_label: OutputLog(message=message, type="error")}
result_data_response = ResultDataResponse(results={}, outputs=outputs)
Expand All @@ -231,7 +233,7 @@ async def build_vertex(
background_tasks.add_task(
log_vertex_build,
flow_id=flow_id_str,
vertex_id=vertex_id,
vertex_id=vertex_id.split("-")[0],
valid=valid,
params=params,
data=result_data_response,
Expand All @@ -256,7 +258,7 @@ async def build_vertex(
if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices:
next_runnable_vertices = [graph.stop_vertex]

if graph.run_manager.all_predecessors_are_fulfilled() and not next_runnable_vertices:
if not graph.run_manager.vertices_to_run and not next_runnable_vertices:
background_tasks.add_task(graph.end_all_traces)

build_response = VertexBuildResponse(
Expand All @@ -271,18 +273,18 @@ async def build_vertex(
background_tasks.add_task(
telemetry_service.log_package_component,
ComponentPayload(
componentName=vertex_id,
componentName=vertex_id.split("-")[0],
componentSeconds=int(time.perf_counter() - start_time),
componentSuccess=valid,
componentErrorMessage=params,
componentErrorMessage=error_message,
),
)
return build_response
except Exception as exc:
background_tasks.add_task(
telemetry_service.log_package_component,
ComponentPayload(
componentName=vertex_id,
componentName=vertex_id.split("-")[0],
componentSeconds=int(time.perf_counter() - start_time),
componentSuccess=False,
componentErrorMessage=str(exc),
Expand Down
1 change: 1 addition & 0 deletions src/backend/base/langflow/services/telemetry/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ShutdownPayload(BaseModel):


class VersionPayload(BaseModel):
package: str
version: str
platform: str
python: str
Expand Down
6 changes: 3 additions & 3 deletions src/backend/base/langflow/services/telemetry/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import httpx
from loguru import logger
from pydantic import BaseModel
from langflow.services.telemetry.opentelemetry import OpenTelemetry

from langflow.services.base import Service
from langflow.services.telemetry.opentelemetry import OpenTelemetry
from langflow.services.telemetry.schema import (
ComponentPayload,
PlaygroundPayload,
Expand All @@ -34,7 +34,6 @@ def __init__(self, settings_service: "SettingsService"):
self.telemetry_queue: asyncio.Queue = asyncio.Queue()
self.client = httpx.AsyncClient(timeout=10.0) # Set a reasonable timeout
self.running = False
self.package = get_version_info()["package"]

self.ot = OpenTelemetry(prometheus_enabled=settings_service.settings.prometheus_enabled)

Expand All @@ -58,7 +57,7 @@ async def send_telemetry_data(self, payload: BaseModel, path: str | None = None)
logger.debug("Telemetry tracking is disabled.")
return

url = f"{self.base_url}/{self.package.lower()}"
url = f"{self.base_url}"
if path:
url = f"{url}/{path}"
try:
Expand Down Expand Up @@ -86,6 +85,7 @@ async def log_package_version(self):
version_info = get_version_info()
architecture = platform.architecture()[0]
payload = VersionPayload(
package=version_info["package"].lower(),
version=version_info["version"],
platform=platform.platform(),
python=python_version,
Expand Down

0 comments on commit 6e00635

Please sign in to comment.