Skip to content

Commit

Permalink
refactor: update OpenTelemetry initialization to avoid multiple calls (
Browse files Browse the repository at this point in the history
…#4306)

* Ensure single initialization of OpenTelemetry and optimize meter provider setup

* Simplify condition by removing redundant class attribute
  • Loading branch information
ogabrielluiz authored Oct 29, 2024
1 parent ccba14b commit b39ee0a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
Binary file removed localhost-19-2.db
Binary file not shown.
35 changes: 23 additions & 12 deletions src/backend/base/langflow/services/telemetry/opentelemetry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import threading
import warnings
from collections.abc import Mapping
from enum import Enum
from typing import Any
Expand Down Expand Up @@ -110,6 +109,8 @@ class OpenTelemetry(metaclass=ThreadSafeSingletonMetaUsingWeakref):
_metrics_registry: dict[str, Metric] = {}
_metrics: dict[str, Counter | ObservableGaugeWrapper | Histogram | UpDownCounter] = {}
_meter_provider: MeterProvider | None = None
_initialized: bool = False # Add initialization flag
prometheus_enabled: bool = True

def _add_metric(
self, name: str, description: str, unit: str, metric_type: MetricType, labels: dict[str, bool]
Expand Down Expand Up @@ -141,20 +142,29 @@ def _register_metric(self) -> None:
)

def __init__(self, *, prometheus_enabled: bool = True):
# Only initialize once
self.prometheus_enabled = prometheus_enabled
if OpenTelemetry._initialized:
return

if not self._metrics_registry:
self._register_metric()

if self._meter_provider is None:
resource = Resource.create({"service.name": "langflow"})
metric_readers = []
# Get existing meter provider if any
existing_provider = metrics.get_meter_provider()

# configure prometheus exporter
self.prometheus_enabled = prometheus_enabled
if prometheus_enabled:
metric_readers.append(PrometheusMetricReader())
# Check if FastAPI instrumentation is already set up
if hasattr(existing_provider, "get_meter") and existing_provider.get_meter("http.server"):
self._meter_provider = existing_provider
else:
resource = Resource.create({"service.name": "langflow"})
metric_readers = []
if self.prometheus_enabled:
metric_readers.append(PrometheusMetricReader())

self._meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers)
metrics.set_meter_provider(self._meter_provider)
self._meter_provider = MeterProvider(resource=resource, metric_readers=metric_readers)
metrics.set_meter_provider(self._meter_provider)

self.meter = self._meter_provider.get_meter(langflow_meter_name)

Expand All @@ -163,11 +173,12 @@ def __init__(self, *, prometheus_enabled: bool = True):
msg = f"Key '{name}' does not match metric name '{metric.name}'"
raise ValueError(msg)
if name not in self._metrics:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self._metrics[metric.name] = self._create_metric(metric)
self._metrics[metric.name] = self._create_metric(metric)

OpenTelemetry._initialized = True

def _create_metric(self, metric):
# Remove _created_instruments check
if metric.name in self._metrics:
return self._metrics[metric.name]

Expand Down

0 comments on commit b39ee0a

Please sign in to comment.