diff --git a/agents-core/tests/test_timer.py b/agents-core/tests/test_timer.py new file mode 100644 index 00000000..81e83347 --- /dev/null +++ b/agents-core/tests/test_timer.py @@ -0,0 +1,446 @@ +"""Tests for the Timer class in observability metrics.""" + +import asyncio +import pytest +from unittest.mock import MagicMock +from vision_agents.core.observability.metrics import Timer + + +@pytest.fixture +def mock_histogram(): + """Create a mock histogram for testing.""" + return MagicMock() + + +class TestTimerContextManager: + """Tests for Timer used as a context manager.""" + + def test_context_manager_records_timing(self, mock_histogram): + """Test that Timer records elapsed time when used as context manager.""" + with Timer(mock_histogram) as timer: + pass # Do nothing, just measure overhead + + # Verify record was called + mock_histogram.record.assert_called_once() + call_args = mock_histogram.record.call_args + + # First argument should be elapsed time in ms + elapsed_ms = call_args[0][0] + assert isinstance(elapsed_ms, float) + assert elapsed_ms >= 0 + + # Should have recorded the elapsed time + assert timer.last_elapsed_ms is not None + assert timer.last_elapsed_ms >= 0 + + def test_context_manager_with_base_attributes(self, mock_histogram): + """Test that base attributes are included in recording.""" + base_attrs = {"provider": "test", "version": "1.0"} + + with Timer(mock_histogram, base_attrs): + pass + + # Verify attributes were passed + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + assert "provider" in recorded_attrs + assert recorded_attrs["provider"] == "test" + assert "version" in recorded_attrs + assert recorded_attrs["version"] == "1.0" + + def test_context_manager_with_dynamic_attributes(self, mock_histogram): + """Test that dynamic attributes can be added during execution.""" + with Timer(mock_histogram, {"base": "value"}) as timer: + timer.attributes["dynamic"] = "added" + timer.attributes["count"] = 42 + + # Verify both base and dynamic attributes were recorded + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + assert recorded_attrs["base"] == "value" + assert recorded_attrs["dynamic"] == "added" + assert recorded_attrs["count"] == 42 + + def test_context_manager_exception_tracking(self, mock_histogram): + """Test that exceptions are tracked in attributes.""" + try: + with Timer(mock_histogram, record_exceptions=True): + raise ValueError("test error") + except ValueError: + pass + + # Verify exception was recorded + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + assert recorded_attrs["exception"] == "true" + assert recorded_attrs["exception_type"] == "ValueError" + + def test_context_manager_no_exception(self, mock_histogram): + """Test that no exception is recorded when code succeeds.""" + with Timer(mock_histogram, record_exceptions=True): + pass + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + assert recorded_attrs["exception"] == "false" + assert "exception_type" not in recorded_attrs + + def test_direct_call_pattern(self, mock_histogram): + """Test Timer used with direct call pattern.""" + timer = Timer(mock_histogram, {"base": "attr"}) + + # Simulate some work + import time + + time.sleep(0.01) + + # Call with extra attributes + elapsed = timer({"phase": "init"}) + + # Verify recording + assert elapsed > 0 + mock_histogram.record.assert_called_once() + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + assert recorded_attrs["base"] == "attr" + assert recorded_attrs["phase"] == "init" + + def test_stop_is_idempotent(self, mock_histogram): + """Test that calling stop multiple times only records once.""" + timer = Timer(mock_histogram) + + timer.stop() + timer.stop() + timer.stop() + + # Should only be called once + assert mock_histogram.record.call_count == 1 + + +class TestTimerDecorator: + """Tests for Timer used as a decorator.""" + + def test_sync_function_decorator(self, mock_histogram): + """Test decorating a synchronous function.""" + + @Timer(mock_histogram, {"func": "test"}) + def my_function(x, y): + return x + y + + result = my_function(2, 3) + + assert result == 5 + mock_histogram.record.assert_called_once() + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + assert recorded_attrs["func"] == "test" + + async def test_async_function_decorator(self, mock_histogram): + """Test decorating an async function.""" + + @Timer(mock_histogram, {"func": "async_test"}) + async def my_async_function(x): + await asyncio.sleep(0.01) + return x * 2 + + result = await my_async_function(5) + + assert result == 10 + mock_histogram.record.assert_called_once() + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + assert recorded_attrs["func"] == "async_test" + + def test_method_decorator_adds_class_name(self, mock_histogram): + """Test that decorating a method automatically adds class name.""" + + class MyClass: + @Timer(mock_histogram, {"method": "process"}) + def process(self): + return "processed" + + instance = MyClass() + result = instance.process() + + assert result == "processed" + mock_histogram.record.assert_called_once() + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + # Should automatically add fully qualified class path + assert "class" in recorded_attrs + # Check it ends with the class name (module path will vary) + assert recorded_attrs["class"].endswith(".MyClass") + assert recorded_attrs["method"] == "process" + + async def test_async_method_decorator_adds_class_name(self, mock_histogram): + """Test that decorating an async method adds class name.""" + + class MyAsyncClass: + @Timer(mock_histogram) + async def async_process(self): + await asyncio.sleep(0.01) + return "async_processed" + + instance = MyAsyncClass() + result = await instance.async_process() + + assert result == "async_processed" + mock_histogram.record.assert_called_once() + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + assert "class" in recorded_attrs + assert recorded_attrs["class"].endswith(".MyAsyncClass") + + +class TestTimerInheritedMethods: + """Tests for Timer with inherited methods - the bug fix.""" + + def test_inherited_method_reports_subclass_name(self, mock_histogram): + """Test that inherited methods report the actual subclass name.""" + + class BaseClass: + @Timer(mock_histogram) + def process(self): + return "processed" + + class SubClassA(BaseClass): + pass + + class SubClassB(BaseClass): + pass + + # Test SubClassA + instance_a = SubClassA() + instance_a.process() + + # Test SubClassB + instance_b = SubClassB() + instance_b.process() + + # Should have been called twice + assert mock_histogram.record.call_count == 2 + + # Check first call (SubClassA) + first_call = mock_histogram.record.call_args_list[0] + first_attrs = first_call[1]["attributes"] + assert first_attrs["class"].endswith(".SubClassA") + + # Check second call (SubClassB) + second_call = mock_histogram.record.call_args_list[1] + second_attrs = second_call[1]["attributes"] + assert second_attrs["class"].endswith(".SubClassB") + + async def test_inherited_async_method_reports_subclass_name(self, mock_histogram): + """Test that inherited async methods report the actual subclass name.""" + + class AsyncBaseClass: + @Timer(mock_histogram) + async def process(self): + await asyncio.sleep(0.01) + return "processed" + + class AsyncSubClass(AsyncBaseClass): + pass + + instance = AsyncSubClass() + await instance.process() + + mock_histogram.record.assert_called_once() + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + # Should report the subclass path, not the base class + assert recorded_attrs["class"].endswith(".AsyncSubClass") + + def test_deeply_nested_inheritance(self, mock_histogram): + """Test that deep inheritance chains still report the correct class.""" + + class GrandParent: + @Timer(mock_histogram) + def process(self): + return "processed" + + class Parent(GrandParent): + pass + + class Child(Parent): + pass + + instance = Child() + instance.process() + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + # Should report the most specific class path + assert recorded_attrs["class"].endswith(".Child") + + +class TestTimerUnits: + """Tests for Timer unit conversions.""" + + def test_millisecond_unit_default(self, mock_histogram): + """Test that default unit is milliseconds.""" + with Timer(mock_histogram): + pass + + call_args = mock_histogram.record.call_args + elapsed = call_args[0][0] + + # Value should be in milliseconds (small positive number) + assert elapsed >= 0 + assert elapsed < 1000 # Should be less than 1 second for this test + + def test_second_unit_conversion(self, mock_histogram): + """Test that seconds unit converts correctly.""" + with Timer(mock_histogram, unit="s"): + import time + + time.sleep(0.01) # Sleep 10ms + + call_args = mock_histogram.record.call_args + elapsed_seconds = call_args[0][0] + + # Should be approximately 0.01 seconds + assert 0.005 < elapsed_seconds < 0.05 + + +class TestTimerEdgeCases: + """Tests for edge cases and error conditions.""" + + def test_timer_without_stop_in_context_manager(self, mock_histogram): + """Test that __exit__ always calls stop.""" + with Timer(mock_histogram) as timer: + # Don't call stop manually + pass + + # Should have been called by __exit__ + mock_histogram.record.assert_called_once() + assert timer.last_elapsed_ms is not None + + def test_restart_clears_attributes(self, mock_histogram): + """Test that restart clears dynamic attributes.""" + timer = Timer(mock_histogram) + + # First use + timer.attributes["first"] = "value1" + timer.stop() + + # Restart and use again + timer._restart() + timer.attributes["second"] = "value2" + timer.stop({"extra": "attr"}) + + # Second call should only have "second" and "extra", not "first" + second_call = mock_histogram.record.call_args_list[1] + second_attrs = second_call[1]["attributes"] + + assert "second" in second_attrs + assert "extra" in second_attrs + assert "first" not in second_attrs + + def test_elapsed_ms_while_running(self, mock_histogram): + """Test that elapsed_ms can be called while timer is running.""" + with Timer(mock_histogram) as timer: + import time + + time.sleep(0.01) + elapsed = timer.elapsed_ms() + assert elapsed > 0 + + # Final elapsed should be >= interim elapsed + assert timer.last_elapsed_ms >= elapsed + + def test_callable_check_in_call(self, mock_histogram): + """Test that __call__ with callable argument triggers decoration.""" + + def my_func(): + return 42 + + timer = Timer(mock_histogram) + decorated = timer(my_func) + + # Should return a wrapped function + assert callable(decorated) + assert decorated() == 42 + mock_histogram.record.assert_called_once() + + +class TestTimerRealWorldScenarios: + """Tests simulating real-world usage patterns.""" + + async def test_stt_pattern(self, mock_histogram): + """Test the pattern used in STT base class.""" + + class STT: + async def process_audio(self, audio_data): + with Timer(mock_histogram) as timer: + timer.attributes["provider"] = self.__class__.__name__ + timer.attributes["samples"] = len(audio_data) + + # Simulate processing + await asyncio.sleep(0.01) + + class DeepgramSTT(STT): + pass + + stt = DeepgramSTT() + await stt.process_audio([1, 2, 3, 4, 5]) + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + assert recorded_attrs["provider"] == "DeepgramSTT" + assert recorded_attrs["samples"] == 5 + + def test_turn_detection_pattern(self, mock_histogram): + """Test the pattern used in TurnDetector base class.""" + + class TurnDetector: + @Timer(mock_histogram) + async def process_audio(self, audio_data): + await asyncio.sleep(0.01) + return "turn_detected" + + class SmartTurnDetection(TurnDetector): + pass + + detector = SmartTurnDetection() + result = asyncio.run(detector.process_audio([1, 2, 3])) + + assert result == "turn_detected" + + call_args = mock_histogram.record.call_args + recorded_attrs = call_args[1]["attributes"] + + # Should report the actual implementation class path + assert recorded_attrs["class"].endswith(".SmartTurnDetection") + + def test_multiple_nested_timers(self, mock_histogram): + """Test that nested timers work independently.""" + with Timer(mock_histogram, {"outer": "timer"}): + with Timer(mock_histogram, {"inner": "timer"}): + pass + + # Both should have recorded + assert mock_histogram.record.call_count == 2 + + # Check both calls had different attributes + first_call_attrs = mock_histogram.record.call_args_list[0][1]["attributes"] + second_call_attrs = mock_histogram.record.call_args_list[1][1]["attributes"] + + assert first_call_attrs["inner"] == "timer" + assert second_call_attrs["outer"] == "timer" diff --git a/agents-core/vision_agents/core/llm/llm.py b/agents-core/vision_agents/core/llm/llm.py index 1871bcd7..3cb76728 100644 --- a/agents-core/vision_agents/core/llm/llm.py +++ b/agents-core/vision_agents/core/llm/llm.py @@ -76,6 +76,35 @@ async def simple_response( processors: Optional[List[Processor]] = None, participant: Optional[Participant] = None, ) -> LLMResponseEvent[Any]: + """ + Wrapper method that tracks metrics and delegates to _simple_response. + """ + from vision_agents.core.observability.metrics import Timer, llm_latency_ms + + with Timer(llm_latency_ms) as timer: + timer.attributes["llm_class"] = ( + f"{self.__class__.__module__}.{self.__class__.__qualname__}" + ) + timer.attributes["provider"] = getattr(self, "provider_name", "unknown") + + try: + result = await self._simple_response(text, processors, participant) + return result + except Exception as e: + timer.attributes["error"] = type(e).__name__ + raise + + @abc.abstractmethod + async def _simple_response( + self, + text: str, + processors: Optional[List[Processor]] = None, + participant: Optional[Participant] = None, + ) -> LLMResponseEvent[Any]: + """ + Implementation-specific response generation. + Subclasses must implement this method. + """ raise NotImplementedError def _build_enhanced_instructions(self) -> Optional[str]: diff --git a/agents-core/vision_agents/core/observability/metrics.py b/agents-core/vision_agents/core/observability/metrics.py index 86b7bd85..b39ccd2a 100644 --- a/agents-core/vision_agents/core/observability/metrics.py +++ b/agents-core/vision_agents/core/observability/metrics.py @@ -1,51 +1,14 @@ -"""OpenTelemetry observability instrumentation for vision-agents library. - -This module defines metrics and tracers for the vision-agents library. It does NOT -configure OpenTelemetry providers - that is the responsibility of applications using -this library. - -For applications using this library: - To enable telemetry, configure OpenTelemetry in your application before importing - vision-agents components: - - ```python - from opentelemetry import trace, metrics - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.metrics import MeterProvider - from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter - from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter - from opentelemetry.sdk.trace.export import BatchSpanProcessor - from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - from opentelemetry.sdk.resources import Resource - - # Configure your service - resource = Resource.create({ - "service.name": "my-voice-app", - "service.version": "1.0.0", - }) - - # Setup trace provider - trace_provider = TracerProvider(resource=resource) - trace_provider.add_span_processor( - BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317")) - ) - trace.set_tracer_provider(trace_provider) - - # Setup metrics provider - metric_reader = PeriodicExportingMetricReader( - OTLPMetricExporter(endpoint="http://localhost:4317") - ) - metrics_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) - metrics.set_meter_provider(metrics_provider) - - # Now import and use vision-agents - from vision_agents.core.tts import TTS - ``` - - If no providers are configured, metrics and traces will be no-ops. -""" +from __future__ import annotations + +import functools +import inspect +from typing import Dict, Any, Optional, Mapping, Callable, Awaitable, TypeVar, Union from opentelemetry import trace, metrics +from opentelemetry.metrics import Histogram +import time + +R = TypeVar("R") # Get tracer and meter using the library name # These will use whatever providers the application has configured @@ -75,3 +38,194 @@ inflight_ops = meter.create_up_down_counter( "voice.ops.inflight", description="Inflight voice ops" ) + +turn_detection_latency_ms = meter.create_histogram( + "turn.detection.latency.ms", + unit="ms", +) +turn_vad_latency_ms = meter.create_histogram( + "turn.vad.latency.ms", unit="ms", description="Turn detection VAD latency" +) +turn_end_detection_latency_ms = meter.create_histogram( + "turn.end_detection.latency.ms", + unit="ms", + description="Turn end detection latency (Vogent/Smart Turn model)", +) +turn_errors = meter.create_counter("turn.errors", description="Turn detection errors") + +llm_latency_ms = meter.create_histogram( + "llm.latency.ms", unit="ms", description="Total LLM latency" +) +llm_errors = meter.create_counter("llm.errors", description="LLM errors") + + +class Timer: + """ + Can be used as: + done = Timer(hist, {"attr": 1}) + ... + done({"phase": "init"}) + + with Timer(hist, {"attr": 1}) as timer: + timer.attributes["dynamic_key"] = "dynamic_value" + ... + + @Timer(hist, {"route": "/join"}) + def handler(...): ... + + @Timer(hist) + async def async_handler(...): ... + + If decorating a method, automatically adds {"class": } to attributes. + + When used as a context manager, you can add attributes dynamically via the + `attributes` property, which will be merged with base attributes when recording. + """ + + def __init__( + self, + hist: Histogram, + attributes: Optional[Mapping[str, Any]] = None, + *, + unit: str = "ms", + record_exceptions: bool = True, + ) -> None: + self._hist = hist + self._base_attrs: Dict[str, Any] = dict(attributes or {}) + self._unit = unit + self._record_exceptions = record_exceptions + + self._start_ns = time.perf_counter_ns() + self._stopped = False + self.last_elapsed_ms: Optional[float] = None + + # Public attributes dictionary that can be modified during context manager usage + self.attributes: Dict[str, Any] = {} + + def __call__(self, *args, **kwargs): + """If called with a function, act as a decorator; else record.""" + if args and callable(args[0]) and len(args) == 1 and not kwargs: + func = args[0] + return self._decorate(func) + extra_attrs = args[0] if args else None + return self.stop(extra_attrs) + + def __enter__(self) -> "Timer": + self._restart() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + attrs: Dict[str, Any] = {} + if self._record_exceptions: + attrs["exception"] = "true" if exc_type else "false" + if exc_type: + attrs["exception_type"] = getattr(exc_type, "__name__", str(exc_type)) + self.stop(attrs) + + def stop(self, extra_attributes: Optional[Mapping[str, Any]] = None) -> float: + """Idempotent: records only once per start.""" + if not self._stopped: + self._stopped = True + elapsed = self.elapsed_ms() + self.last_elapsed_ms = elapsed + + attrs = {**self._base_attrs} + # Merge the dynamic attributes set during context manager usage + attrs.update(self.attributes) + if extra_attributes: + attrs.update(dict(extra_attributes)) + + value = elapsed if self._unit == "ms" else elapsed / 1000.0 + self._hist.record(value, attributes=attrs) + + return self.last_elapsed_ms or 0.0 + + def elapsed_ms(self) -> float: + return (time.perf_counter_ns() - self._start_ns) / 1_000_000.0 + + def _restart(self) -> None: + self._start_ns = time.perf_counter_ns() + self._stopped = False + self.last_elapsed_ms = None + self.attributes = {} # Reset dynamic attributes on restart + + def _decorate( + self, func: Union[Callable[..., R], Callable[..., Awaitable[R]]] + ) -> Union[Callable[..., R], Callable[..., Awaitable[R]]]: + """ + Decorate a function or method. + Automatically adds {"class": } if decorating a bound method. + """ + + is_async = inspect.iscoroutinefunction(func) + + if is_async: + # Type-cast func as async for type checker + async_func: Callable[..., Awaitable[R]] = func # type: ignore[assignment] + + @functools.wraps(async_func) + async def async_wrapper(*args, **kwargs) -> R: + class_name = _get_class_name_from_args(async_func, args) + attrs = {**self._base_attrs} + if class_name: + attrs["class"] = class_name + with Timer( + self._hist, + attrs, + unit=self._unit, + record_exceptions=self._record_exceptions, + ): + return await async_func(*args, **kwargs) + + return async_wrapper + else: + # Type-cast func as sync for type checker + sync_func: Callable[..., R] = func # type: ignore[assignment] + + @functools.wraps(sync_func) + def sync_wrapper(*args, **kwargs) -> R: + class_name = _get_class_name_from_args(sync_func, args) + attrs = {**self._base_attrs} + if class_name: + attrs["class"] = class_name + with Timer( + self._hist, + attrs, + unit=self._unit, + record_exceptions=self._record_exceptions, + ): + return sync_func(*args, **kwargs) + + return sync_wrapper + + +def _get_class_name_from_args( + func: Callable[..., Any], args: tuple[Any, ...] +) -> Optional[str]: + """Return fully qualified class path if first arg looks like a bound method (self or cls). + + For instance methods (self), we return the runtime class path (module.Class), not just + the class name. This provides better identification in metrics, especially when multiple + plugins use the same class name (e.g., TTS). + + Returns: + Fully qualified class path like "vision_agents.plugins.cartesia.tts.TTS" + or None if not a method call. + """ + if not args: + return None + + first = args[0] + + # Check if this looks like an instance method call (self parameter) + if hasattr(first, "__class__") and not inspect.isclass(first): + # Verify it's actually a method by checking the function's qualname contains a dot + if "." in func.__qualname__: + # Return the fully qualified class path + return f"{first.__class__.__module__}.{first.__class__.__qualname__}" + + # Check if this looks like a class method call (cls parameter) + if inspect.isclass(first) and func.__qualname__.startswith(first.__name__ + "."): + return f"{first.__module__}.{first.__qualname__}" + + return None diff --git a/agents-core/vision_agents/core/stt/stt.py b/agents-core/vision_agents/core/stt/stt.py index 263c1855..b2f0ea05 100644 --- a/agents-core/vision_agents/core/stt/stt.py +++ b/agents-core/vision_agents/core/stt/stt.py @@ -6,6 +6,7 @@ from ..edge.types import Participant from vision_agents.core.events.manager import EventManager +from ..observability.metrics import Timer, stt_latency_ms, stt_errors from . import events from .events import TranscriptResponse @@ -23,6 +24,7 @@ class STT(abc.ABC): process_audio is currently called every 20ms. The integration with turn keeping could be improved """ + closed: bool = False started: bool = False @@ -39,7 +41,7 @@ def __init__( async def warmup(self) -> None: """ Warm up the STT service. - + This method can be overridden by implementations to perform model loading, connection establishment, or other initialization that should happen before the first transcription request. @@ -60,13 +62,15 @@ def _emit_transcript_event( participant: Participant metadata. response: Transcription response metadata. """ - self.events.send(events.STTTranscriptEvent( - session_id=self.session_id, - plugin_name=self.provider_name, - text=text, - participant=participant, - response=response, - )) + self.events.send( + events.STTTranscriptEvent( + session_id=self.session_id, + plugin_name=self.provider_name, + text=text, + participant=participant, + response=response, + ) + ) def _emit_partial_transcript_event( self, @@ -82,13 +86,15 @@ def _emit_partial_transcript_event( participant: Participant metadata. response: Transcription response metadata. """ - self.events.send(events.STTPartialTranscriptEvent( - session_id=self.session_id, - plugin_name=self.provider_name, - text=text, - participant=participant, - response=response, - )) + self.events.send( + events.STTPartialTranscriptEvent( + session_id=self.session_id, + plugin_name=self.provider_name, + text=text, + participant=participant, + response=response, + ) + ) def _emit_error_event( self, @@ -100,20 +106,73 @@ def _emit_error_event( Emit an error event. Note this should only be emitted for temporary errors. Permanent errors due to config etc should be directly raised """ - self.events.send(events.STTErrorEvent( - session_id=self.session_id, - plugin_name=self.provider_name, - error=error, - context=context, - participant=participant, - error_code=getattr(error, "error_code", None), - is_recoverable=not isinstance(error, (SystemExit, KeyboardInterrupt)), - )) + # Increment error counter + stt_errors.add( + 1, {"provider": self.provider_name, "error_type": type(error).__name__} + ) + + self.events.send( + events.STTErrorEvent( + session_id=self.session_id, + plugin_name=self.provider_name, + error=error, + context=context, + participant=participant, + error_code=getattr(error, "error_code", None), + is_recoverable=not isinstance(error, (SystemExit, KeyboardInterrupt)), + ) + ) - @abc.abstractmethod async def process_audio( - self, pcm_data: PcmData, participant: Optional[Participant] = None, + self, + pcm_data: PcmData, + participant: Optional[Participant] = None, + ): + """ + Process audio with automatic metrics tracking. + + This method wraps the actual processing with metrics collection + and delegates to the _process_audio method that subclasses implement. + + Args: + pcm_data: Audio data to process + participant: Optional participant metadata + """ + with Timer(stt_latency_ms) as timer: + # Use fully qualified class path for better identification + timer.attributes["stt_class"] = ( + f"{self.__class__.__module__}.{self.__class__.__qualname__}" + ) + timer.attributes["provider"] = self.provider_name + timer.attributes["sample_rate"] = pcm_data.sample_rate + timer.attributes["channels"] = pcm_data.channels + timer.attributes["samples"] = ( + len(pcm_data.samples) if pcm_data.samples is not None else 0 + ) + timer.attributes["duration_ms"] = pcm_data.duration_ms + + try: + await self._process_audio(pcm_data, participant) + except Exception as e: + timer.attributes["error"] = type(e).__name__ + raise + + @abc.abstractmethod + async def _process_audio( + self, + pcm_data: PcmData, + participant: Optional[Participant] = None, ): + """ + Process audio data and emit transcription events. + + Subclasses must implement this method to perform the actual STT processing. + The base class handles metrics collection automatically. + + Args: + pcm_data: Audio data to process + participant: Optional participant metadata + """ pass async def start(self): diff --git a/agents-core/vision_agents/core/tts/tts.py b/agents-core/vision_agents/core/tts/tts.py index 1cec9448..808ddb74 100644 --- a/agents-core/vision_agents/core/tts/tts.py +++ b/agents-core/vision_agents/core/tts/tts.py @@ -73,7 +73,7 @@ def __init__(self, provider_name: Optional[str] = None): async def warmup(self) -> None: """ Warm up the TTS service. - + This method can be overridden by implementations to perform model loading, connection establishment, or other initialization that should happen before the first synthesis request. @@ -142,7 +142,7 @@ def _emit_chunk( # Resample to desired format if needed pcm = pcm.resample( target_sample_rate=self._desired_sample_rate, - target_channels=self._desired_channels + target_channels=self._desired_channels, ) self.events.send( @@ -297,9 +297,9 @@ async def send( raise finally: elapsed_ms = (time.time() - start_time) * 1000.0 - tts_latency_ms.record( - elapsed_ms, attributes={"tts_class": self.__class__.__name__} - ) + # Use fully qualified class path for better identification + class_path = f"{self.__class__.__module__}.{self.__class__.__qualname__}" + tts_latency_ms.record(elapsed_ms, attributes={"tts_class": class_path}) async def close(self): """Close the TTS service and release any resources.""" diff --git a/agents-core/vision_agents/core/turn_detection/turn_detection.py b/agents-core/vision_agents/core/turn_detection/turn_detection.py index ce460ed3..c2ed0dea 100644 --- a/agents-core/vision_agents/core/turn_detection/turn_detection.py +++ b/agents-core/vision_agents/core/turn_detection/turn_detection.py @@ -8,6 +8,7 @@ from .events import TurnStartedEvent, TurnEndedEvent from ..agents.conversation import Conversation from ..edge.types import Participant +from ..observability.metrics import turn_detection_latency_ms, turn_errors, Timer class TurnEvent(Enum): @@ -17,14 +18,11 @@ class TurnEvent(Enum): TURN_ENDED = "turn_ended" - class TurnDetector(ABC): """Base implementation for turn detection with common functionality.""" def __init__( - self, - confidence_threshold: float = 0.5, - provider_name: Optional[str] = None + self, confidence_threshold: float = 0.5, provider_name: Optional[str] = None ) -> None: self._confidence_threshold = confidence_threshold self.is_active = False @@ -36,28 +34,23 @@ def __init__( async def warmup(self) -> None: """ Warm up the turn detection service. - + This method can be overridden by implementations to perform model loading, connection establishment, or other initialization that should happen before the first audio processing. """ pass - def _emit_start_turn_event( - self, event: TurnStartedEvent - ) -> None: + def _emit_start_turn_event(self, event: TurnStartedEvent) -> None: event.session_id = self.session_id event.plugin_name = self.provider_name self.events.send(event) - def _emit_end_turn_event( - self, event: TurnEndedEvent - ) -> None: + def _emit_end_turn_event(self, event: TurnEndedEvent) -> None: event.session_id = self.session_id event.plugin_name = self.provider_name self.events.send(event) - @abstractmethod async def process_audio( self, audio_data: PcmData, @@ -71,6 +64,34 @@ async def process_audio( participant: Participant that's speaking, includes user data conversation: Transcription/ chat history, sometimes useful for turn detection """ + with Timer(turn_detection_latency_ms) as timer: + timer.attributes["class"] = ( + f"{self.__class__.__module__}.{self.__class__.__qualname__}" + ) + timer.attributes["provider"] = self.provider_name + try: + await self.detect_turn(audio_data, participant, conversation) + except Exception as e: + timer.attributes["error"] = type(e).__name__ + turn_errors.add( + 1, {"provider": self.provider_name, "error_type": type(e).__name__} + ) + raise + + @abstractmethod + async def detect_turn( + self, + audio_data: PcmData, + participant: Participant, + conversation: Optional[Conversation], + ) -> None: + """Process the audio and trigger turn start or turn end events + + Args: + audio_data: PcmData object containing audio samples from Stream + participant: Participant that's speaking, includes user data + conversation: Transcription/ chat history, sometimes useful for turn detection + """ ... diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..cb2ce184 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,74 @@ +services: + # Jaeger for distributed tracing + jaeger: + image: jaegertracing/all-in-one:latest + container_name: vision-agents-jaeger + ports: + - "16686:16686" # Jaeger UI + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + environment: + - COLLECTOR_OTLP_ENABLED=true + networks: + - observability + + # Prometheus for metrics collection + prometheus: + image: prom/prometheus:latest + container_name: vision-agents-prometheus + ports: + - "9090:9090" + volumes: + - ./observability/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml + - prometheus-data:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/usr/share/prometheus/console_libraries' + - '--web.console.templates=/usr/share/prometheus/consoles' + - '--web.enable-lifecycle' + networks: + - observability + + # Grafana for visualization + grafana: + image: grafana/grafana:latest + container_name: vision-agents-grafana + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_USERS_ALLOW_SIGN_UP=false + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + - GF_AUTH_DISABLE_LOGIN_FORM=true + volumes: + - ./observability/grafana/provisioning:/etc/grafana/provisioning + - ./observability/grafana/dashboards:/var/lib/grafana/dashboards + - grafana-data:/var/lib/grafana + depends_on: + - prometheus + networks: + - observability + + # Init service to set home dashboard + grafana-init: + image: curlimages/curl:latest + container_name: vision-agents-grafana-init + volumes: + - ./observability/grafana/init-home-dashboard.sh:/init-home-dashboard.sh:ro + command: sh /init-home-dashboard.sh + depends_on: + - grafana + networks: + - observability + restart: "no" + +volumes: + prometheus-data: + grafana-data: + +networks: + observability: + driver: bridge diff --git a/examples/01_simple_agent_example/simple_agent_example.py b/examples/01_simple_agent_example/simple_agent_example.py index a063b765..991527a7 100644 --- a/examples/01_simple_agent_example/simple_agent_example.py +++ b/examples/01_simple_agent_example/simple_agent_example.py @@ -71,11 +71,14 @@ async def join_call(agent: Agent, call_type: str, call_id: str, **kwargs) -> Non def setup_telemetry(): import atexit - from opentelemetry import trace + from opentelemetry import trace, metrics from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.exporter.prometheus import PrometheusMetricReader + from prometheus_client import start_http_server resource = Resource.create( { @@ -88,6 +91,13 @@ def setup_telemetry(): tp.add_span_processor(BatchSpanProcessor(exporter)) trace.set_tracer_provider(tp) + reader = PrometheusMetricReader() + metrics.set_meter_provider( + MeterProvider(resource=resource, metric_readers=[reader]) + ) + + start_http_server(port=9464) + def _flush_and_shutdown(): tp.force_flush() tp.shutdown() diff --git a/observability/.gitignore b/observability/.gitignore new file mode 100644 index 00000000..d577e73b --- /dev/null +++ b/observability/.gitignore @@ -0,0 +1,4 @@ +# Ignore Docker volume data +data/ +*.tmp +*.log diff --git a/observability/README.md b/observability/README.md new file mode 100644 index 00000000..bcd44481 --- /dev/null +++ b/observability/README.md @@ -0,0 +1,179 @@ +# Vision Agents Observability Stack + +This directory contains the complete observability setup for Vision Agents, including: +- **Prometheus** for metrics collection +- **Jaeger** for distributed tracing +- **Grafana** for visualization with pre-configured dashboards + +## Quick Start + +### 1. Start the Observability Stack + +From the root of the Vision Agents repository: + +```bash +docker-compose up -d +``` + +This will start: +- **Jaeger UI**: http://localhost:16686 +- **Prometheus UI**: http://localhost:9090 +- **Grafana**: http://localhost:3000 (admin/admin) + +### 2. Run Your Vision Agents Application + +The example in `examples/01_simple_agent_example/simple_agent_example.py` already includes the `setup_telemetry()` function that: +- Exports traces to Jaeger (OTLP on port 4317) +- Exposes Prometheus metrics on port 9464 + +Run the example: + +```bash +cd examples/01_simple_agent_example +uv run python simple_agent_example.py +``` + +### 3. View Metrics in Grafana + +1. Open Grafana: http://localhost:3000 +2. Login with `admin` / `admin` +3. Navigate to **Dashboards** → **Vision Agents - Performance Metrics** + +The dashboard automatically displays: +- **LLM Latency** (p50, p95, p99) by implementation +- **STT Latency** (p50, p95, p99) by implementation +- **TTS Latency** (p50, p95, p99) by implementation +- **Turn Detection Latency** (p50, p95, p99) by implementation +- **All Errors Rate** - Combined view of LLM, STT, and TTS errors by provider and error type + +### 4. View Traces in Jaeger + +1. Open Jaeger: http://localhost:16686 +2. Select service: `agents` +3. Click **Find Traces** to see distributed traces + +## Architecture + +### Metrics Flow + +``` +Vision Agents App (port 9464) + ↓ (scrape every 5s) +Prometheus (port 9090) + ↓ (datasource) +Grafana (port 3000) +``` + +### Traces Flow + +``` +Vision Agents App + ↓ (OTLP gRPC on port 4317) +Jaeger Collector + ↓ +Jaeger UI (port 16686) +``` + +## Available Metrics + +### STT Metrics +- `stt_latency_ms` - Histogram of STT processing latency + - Labels: `stt_class`, `provider`, `sample_rate`, `channels`, `samples`, `duration_ms` +- `stt_errors` - Counter of STT errors + - Labels: `provider`, `error_type` + +### TTS Metrics +- `tts_latency_ms` - Histogram of TTS synthesis latency + - Labels: `tts_class` +- `tts_errors` - Counter of TTS errors + - Labels: `provider`, `error_type` + +### Turn Detection Metrics +- `turn_detection_latency_ms` - Histogram of turn detection latency + - Labels: `class` + +### LLM Metrics +- `llm_latency_ms` - Histogram of LLM response latency + - Labels: `llm_class`, `provider` +- `llm_errors` - Counter of LLM errors + - Labels: `provider`, `error` + +## Configuration + +### Prometheus + +Edit `prometheus/prometheus.yml` to: +- Change scrape interval +- Add additional scrape targets +- Configure alerting rules + +### Grafana + +#### Add Custom Dashboards + +1. Place JSON dashboard files in `grafana/dashboards/` +2. They will be automatically loaded on startup + +#### Modify Datasources + +Edit `grafana/provisioning/datasources/prometheus.yml` + +### Jaeger + +Jaeger is configured with default settings. To customize, modify the `jaeger` service in `docker-compose.yml`. + +## Troubleshooting + +### Prometheus Can't Scrape Metrics + +**Issue**: Prometheus shows target as "down" + +**Solution**: Ensure `host.docker.internal` resolves correctly: +- **Linux**: Add `--add-host=host.docker.internal:host-gateway` to the prometheus service in docker-compose.yml +- **Mac/Windows**: Should work by default + +### No Data in Grafana + +1. Check Prometheus is scraping: http://localhost:9090/targets +2. Verify metrics are exposed: http://localhost:9464/metrics +3. Ensure your Vision Agents app is running with telemetry enabled + +### Jaeger Shows No Traces + +1. Verify OTLP receiver is running: `docker logs vision-agents-jaeger` +2. Check your app's trace exporter configuration +3. Ensure `endpoint="localhost:4317"` in your app + +## Stopping the Stack + +```bash +docker-compose down +``` + +To remove all data (metrics, dashboards, etc.): + +```bash +docker-compose down -v +``` + +## Production Considerations + +This setup is designed for development. For production: + +1. **Security**: + - Change default Grafana password + - Add authentication to Prometheus + - Use TLS for all connections + +2. **Persistence**: + - Configure external volumes for data persistence + - Set up regular backups + +3. **Scalability**: + - Use Prometheus remote write for long-term storage + - Consider Jaeger production deployment with Elasticsearch/Cassandra + - Deploy Grafana with a proper database backend + +4. **Monitoring**: + - Set up alerts in Prometheus/Grafana + - Configure notification channels (Slack, PagerDuty, etc.) diff --git a/observability/grafana/dashboards/vision-agents.json b/observability/grafana/dashboards/vision-agents.json new file mode 100644 index 00000000..22374fcd --- /dev/null +++ b/observability/grafana/dashboards/vision-agents.json @@ -0,0 +1,822 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "id": null, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "10.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.50, sum(rate(llm_latency_ms_bucket[5m])) by (le, llm_class))", + "legendFormat": "p50 - {{llm_class}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.95, sum(rate(llm_latency_ms_bucket[5m])) by (le, llm_class))", + "legendFormat": "p95 - {{llm_class}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.99, sum(rate(llm_latency_ms_bucket[5m])) by (le, llm_class))", + "legendFormat": "p99 - {{llm_class}}", + "refId": "C" + } + ], + "title": "LLM Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "10.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.50, sum(rate(stt_latency_ms_bucket[5m])) by (le, stt_class))", + "legendFormat": "p50 - {{stt_class}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.95, sum(rate(stt_latency_ms_bucket[5m])) by (le, stt_class))", + "legendFormat": "p95 - {{stt_class}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.99, sum(rate(stt_latency_ms_bucket[5m])) by (le, stt_class))", + "legendFormat": "p99 - {{stt_class}}", + "refId": "C" + } + ], + "title": "STT Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 3, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "10.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.50, sum(rate(tts_latency_ms_bucket[5m])) by (le, tts_class))", + "legendFormat": "p50 - {{tts_class}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.95, sum(rate(tts_latency_ms_bucket[5m])) by (le, tts_class))", + "legendFormat": "p95 - {{tts_class}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.99, sum(rate(tts_latency_ms_bucket[5m])) by (le, tts_class))", + "legendFormat": "p99 - {{tts_class}}", + "refId": "C" + } + ], + "title": "TTS Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 4, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "10.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.50, sum(rate(turn_detection_latency_ms_bucket[5m])) by (le, provider))", + "legendFormat": "p50 - {{provider}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.95, sum(rate(turn_detection_latency_ms_bucket[5m])) by (le, provider))", + "legendFormat": "p95 - {{provider}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.99, sum(rate(turn_detection_latency_ms_bucket[5m])) by (le, provider))", + "legendFormat": "p99 - {{provider}}", + "refId": "C" + } + ], + "title": "Turn Detection Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 16 + }, + "id": 5, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "10.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.50, sum(rate(turn_vad_latency_ms_bucket[5m])) by (le, implementation))", + "legendFormat": "p50 - {{implementation}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.95, sum(rate(turn_vad_latency_ms_bucket[5m])) by (le, implementation))", + "legendFormat": "p95 - {{implementation}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.99, sum(rate(turn_vad_latency_ms_bucket[5m])) by (le, implementation))", + "legendFormat": "p99 - {{implementation}}", + "refId": "C" + } + ], + "title": "Turn VAD Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "ms" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 8, + "y": 16 + }, + "id": 6, + "options": { + "legend": { + "calcs": ["mean", "max"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "10.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.50, sum(rate(turn_end_detection_latency_ms_bucket[5m])) by (le, implementation))", + "legendFormat": "p50 - {{implementation}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.95, sum(rate(turn_end_detection_latency_ms_bucket[5m])) by (le, implementation))", + "legendFormat": "p95 - {{implementation}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "histogram_quantile(0.99, sum(rate(turn_end_detection_latency_ms_bucket[5m])) by (le, implementation))", + "legendFormat": "p99 - {{implementation}}", + "refId": "C" + } + ], + "title": "Turn End Detection Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "tooltip": false, + "viz": false, + "legend": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 16, + "y": 16 + }, + "id": 7, + "options": { + "legend": { + "calcs": ["sum"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "pluginVersion": "10.0.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(llm_errors_total[5m])", + "legendFormat": "LLM - {{provider}} - {{error}}", + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(stt_errors_total[5m])", + "legendFormat": "STT - {{provider}} - {{error_type}}", + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(tts_errors_total[5m])", + "legendFormat": "TTS - {{provider}} - {{error_type}}", + "refId": "C" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(turn_errors_total[5m])", + "legendFormat": "TURN - {{provider}} - {{error_type}}", + "refId": "D" + } + ], + "title": "All Errors Rate", + "type": "timeseries" + } + ], + "refresh": "5s", + "schemaVersion": 38, + "style": "dark", + "tags": ["vision-agents", "observability"], + "templating": { + "list": [] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Vision Agents - Performance Metrics", + "uid": "vision-agents-metrics", + "version": 0, + "weekStart": "" +} diff --git a/observability/grafana/init-home-dashboard.sh b/observability/grafana/init-home-dashboard.sh new file mode 100755 index 00000000..cad54dda --- /dev/null +++ b/observability/grafana/init-home-dashboard.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# Wait for Grafana to be ready +echo "Waiting for Grafana to be ready..." +until curl -s http://grafana:3000/api/health > /dev/null 2>&1; do + echo "Grafana not ready yet, waiting..." + sleep 2 +done + +echo "Grafana is ready!" +sleep 5 # Give it a bit more time for provisioning to complete + +# Get the dashboard UID +DASHBOARD_UID="vision-agents-metrics" + +# Set the home dashboard for the organization +echo "Setting org home dashboard to Vision Agents - Performance Metrics..." +curl -X PUT \ + -H "Content-Type: application/json" \ + -d "{\"homeDashboardUID\":\"${DASHBOARD_UID}\"}" \ + http://grafana:3000/api/org/preferences + +# Also set it as the default home dashboard for admin user (for when they log in) +echo "" +echo "Setting admin user home dashboard..." +curl -X PUT \ + -u "admin:admin" \ + -H "Content-Type: application/json" \ + -d "{\"homeDashboardUID\":\"${DASHBOARD_UID}\"}" \ + http://grafana:3000/api/user/preferences + +echo "" +echo "Home dashboard configured successfully!" diff --git a/observability/grafana/provisioning/dashboards/default.yml b/observability/grafana/provisioning/dashboards/default.yml new file mode 100644 index 00000000..ed949c18 --- /dev/null +++ b/observability/grafana/provisioning/dashboards/default.yml @@ -0,0 +1,13 @@ +apiVersion: 1 + +providers: + - name: 'Vision Agents' + orgId: 1 + folder: '' + type: file + disableDeletion: false + updateIntervalSeconds: 10 + allowUiUpdates: true + options: + path: /var/lib/grafana/dashboards + foldersFromFilesStructure: true diff --git a/observability/grafana/provisioning/datasources/prometheus.yml b/observability/grafana/provisioning/datasources/prometheus.yml new file mode 100644 index 00000000..cfd90598 --- /dev/null +++ b/observability/grafana/provisioning/datasources/prometheus.yml @@ -0,0 +1,12 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + uid: prometheus + isDefault: true + editable: true + jsonData: + timeInterval: 5s diff --git a/observability/prometheus/prometheus.yml b/observability/prometheus/prometheus.yml new file mode 100644 index 00000000..83a2693a --- /dev/null +++ b/observability/prometheus/prometheus.yml @@ -0,0 +1,21 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + external_labels: + monitor: 'vision-agents-monitor' + +scrape_configs: + # Scrape metrics from Vision Agents application + - job_name: 'vision-agents' + static_configs: + - targets: ['host.docker.internal:9464'] + labels: + service: 'vision-agents' + environment: 'development' + scrape_interval: 5s + scrape_timeout: 5s + + # Scrape Prometheus self-metrics + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] diff --git a/plugins/anthropic/vision_agents/plugins/anthropic/anthropic_llm.py b/plugins/anthropic/vision_agents/plugins/anthropic/anthropic_llm.py index 30691576..91012809 100644 --- a/plugins/anthropic/vision_agents/plugins/anthropic/anthropic_llm.py +++ b/plugins/anthropic/vision_agents/plugins/anthropic/anthropic_llm.py @@ -14,7 +14,10 @@ from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant -from vision_agents.core.llm.events import LLMResponseChunkEvent, LLMResponseCompletedEvent +from vision_agents.core.llm.events import ( + LLMResponseChunkEvent, + LLMResponseCompletedEvent, +) from vision_agents.core.processors import Processor from . import events @@ -59,14 +62,17 @@ def __init__( super().__init__() self.events.register_events_from_module(events) self.model = model - self._pending_tool_uses_by_index: Dict[int, Dict[str, Any]] = {} # index -> {id, name, parts: []} + self._pending_tool_uses_by_index: Dict[ + int, Dict[str, Any] + ] = {} # index -> {id, name, parts: []} + self.provider_name = "anthropic" if client is not None: self.client = client else: self.client = anthropic.AsyncAnthropic(api_key=api_key) - async def simple_response( + async def _simple_response( self, text: str, processors: Optional[List[Processor]] = None, @@ -107,7 +113,7 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: # ensure the AI remembers the past conversation new_messages = kwargs["messages"] - if hasattr(self, '_conversation') and self._conversation: + if hasattr(self, "_conversation") and self._conversation: old_messages = [m.original for m in self._conversation.messages] kwargs["messages"] = old_messages + new_messages # Add messages to conversation @@ -122,7 +128,7 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: # Extract text from Claude's response format - safely handle all text blocks text = self._concat_text_blocks(original.content) llm_response = LLMResponseEvent(original, text) - + # Multi-hop tool calling loop for non-streaming function_calls = self._extract_tool_calls_from_response(original) if function_calls: @@ -131,39 +137,53 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: rounds = 0 seen: set[tuple[str, str, str]] = set() current_calls = function_calls - + while current_calls and rounds < MAX_ROUNDS: # Execute calls concurrently with dedup - triples, seen = await self._dedup_and_execute(current_calls, seen=seen, max_concurrency=8, timeout_s=30) # type: ignore[arg-type] - + triples, seen = await self._dedup_and_execute( + current_calls, # type: ignore[arg-type] + seen=seen, + max_concurrency=8, + timeout_s=30, + ) + if not triples: break - + # Build tool_result user message assistant_content = [] tool_result_blocks = [] for tc, res, err in triples: - assistant_content.append({ - "type": "tool_use", - "id": tc["id"], - "name": tc["name"], - "input": tc["arguments_json"], - }) - + assistant_content.append( + { + "type": "tool_use", + "id": tc["id"], + "name": tc["name"], + "input": tc["arguments_json"], + } + ) + payload = self._sanitize_tool_output(res) - tool_result_blocks.append({ - "type": "tool_result", - "tool_use_id": tc["id"], - "content": payload, - }) + tool_result_blocks.append( + { + "type": "tool_result", + "tool_use_id": tc["id"], + "content": payload, + } + ) assistant_msg = {"role": "assistant", "content": assistant_content} - user_tool_results_msg = {"role": "user", "content": tool_result_blocks} + user_tool_results_msg = { + "role": "user", + "content": tool_result_blocks, + } messages = messages + [assistant_msg, user_tool_results_msg] # Ask again WITH tools so Claude can do another hop tools_cfg = { - "tools": self._convert_tools_to_provider_format(self.get_available_functions()), + "tools": self._convert_tools_to_provider_format( + self.get_available_functions() + ), "tool_choice": {"type": "auto"}, "stream": False, "model": self.model, @@ -172,22 +192,29 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: } follow_up_response = await self.client.messages.create(**tools_cfg) - + # Extract new tool calls from follow-up response - current_calls = self._extract_tool_calls_from_response(follow_up_response) - llm_response = LLMResponseEvent(follow_up_response, self._concat_text_blocks(follow_up_response.content)) + current_calls = self._extract_tool_calls_from_response( + follow_up_response + ) + llm_response = LLMResponseEvent( + follow_up_response, + self._concat_text_blocks(follow_up_response.content), + ) rounds += 1 - + # Finalization pass: no tools so Claude must answer in text if current_calls or rounds > 0: # Only if we had tool calls final_response = await self.client.messages.create( model=self.model, - messages=messages, # includes assistant tool_use + user tool_result blocks + messages=messages, # includes assistant tool_use + user tool_result blocks stream=False, - max_tokens=1000 + max_tokens=1000, + ) + llm_response = LLMResponseEvent( + final_response, self._concat_text_blocks(final_response.content) ) - llm_response = LLMResponseEvent(final_response, self._concat_text_blocks(final_response.content)) - + elif isinstance(original, AsyncStream): stream: AsyncStream[RawMessageStreamEvent] = original text_parts: List[str] = [] @@ -195,7 +222,9 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: # 1) First round: read stream, gather initial tool_use calls async for event in stream: - llm_response_optional = self._standardize_and_emit_event(event, text_parts) + llm_response_optional = self._standardize_and_emit_event( + event, text_parts + ) if llm_response_optional is not None: llm_response = llm_response_optional # Collect tool_use calls as they complete (your helper already reconstructs args) @@ -213,7 +242,12 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: last_followup_stream = None while accumulated_calls and rounds < MAX_ROUNDS: # Execute calls concurrently with dedup - triples, seen = await self._dedup_and_execute(accumulated_calls, seen=seen, max_concurrency=8, timeout_s=30) # type: ignore[arg-type] + triples, seen = await self._dedup_and_execute( + accumulated_calls, # type: ignore[arg-type] + seen=seen, + max_concurrency=8, + timeout_s=30, + ) # Build tool_result user message # Also reconstruct the assistant tool_use message that triggered these calls @@ -221,22 +255,26 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: executed_calls: List[NormalizedToolCallItem] = [] for tc, res, err in triples: executed_calls.append(tc) - assistant_content.append({ - "type": "tool_use", - "id": tc["id"], - "name": tc["name"], - "input": tc["arguments_json"], - }) + assistant_content.append( + { + "type": "tool_use", + "id": tc["id"], + "name": tc["name"], + "input": tc["arguments_json"], + } + ) # tool_result blocks (sanitize to keep payloads safe) tool_result_blocks = [] for tc, res, err in triples: payload = self._sanitize_tool_output(res) - tool_result_blocks.append({ - "type": "tool_result", - "tool_use_id": tc["id"], - "content": payload, - }) + tool_result_blocks.append( + { + "type": "tool_result", + "tool_use_id": tc["id"], + "content": payload, + } + ) assistant_msg = {"role": "assistant", "content": assistant_content} user_tool_results_msg = {"role": "user", "content": tool_result_blocks} @@ -244,7 +282,9 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: # Ask again WITH tools so Claude can do another hop tools_cfg = { - "tools": self._convert_tools_to_provider_format(self.get_available_functions()), + "tools": self._convert_tools_to_provider_format( + self.get_available_functions() + ), "tool_choice": {"type": "auto"}, "stream": True, "model": self.model, @@ -259,7 +299,9 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: accumulated_calls = [] # reset; we'll refill with new calls async for ev in follow_up_stream: last_followup_stream = ev - llm_response_optional = self._standardize_and_emit_event(ev, follow_up_text_parts) + llm_response_optional = self._standardize_and_emit_event( + ev, follow_up_text_parts + ) if llm_response_optional is not None: llm_response = llm_response_optional new_calls, _ = self._extract_tool_calls_from_stream_chunk(ev, None) @@ -276,14 +318,16 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: if accumulated_calls or rounds > 0: # Only if we had tool calls final_stream = await self.client.messages.create( model=self.model, - messages=messages, # includes assistant tool_use + user tool_result blocks + messages=messages, # includes assistant tool_use + user tool_result blocks stream=True, - max_tokens=1000 + max_tokens=1000, ) final_text_parts: List[str] = [] async for ev in final_stream: last_followup_stream = ev - llm_response_optional = self._standardize_and_emit_event(ev, final_text_parts) + llm_response_optional = self._standardize_and_emit_event( + ev, final_text_parts + ) if llm_response_optional is not None: llm_response = llm_response_optional if final_text_parts: @@ -291,8 +335,17 @@ async def create_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: # 4) Done -> return all collected text total_text = "".join(text_parts) - llm_response = LLMResponseEvent(last_followup_stream or original, total_text) # type: ignore - self.events.send(LLMResponseCompletedEvent(original=last_followup_stream or original, text=total_text, plugin_name="anthropic")) + llm_response = LLMResponseEvent( + last_followup_stream or original, # type: ignore[arg-type] + total_text, + ) + self.events.send( + LLMResponseCompletedEvent( + original=last_followup_stream or original, + text=total_text, + plugin_name="anthropic", + ) + ) return llm_response @@ -303,10 +356,9 @@ def _standardize_and_emit_event( Forwards the events and also send out a standardized version (the agent class hooks into that) """ # forward the native event - self.events.send(events.ClaudeStreamEvent( - plugin_name="anthropic", - event_data=event - )) + self.events.send( + events.ClaudeStreamEvent(plugin_name="anthropic", event_data=event) + ) # send a standardized version for delta and response if event.type == "content_block_delta": @@ -314,14 +366,16 @@ def _standardize_and_emit_event( if hasattr(delta_event.delta, "text") and delta_event.delta.text: text_parts.append(delta_event.delta.text) - self.events.send(LLMResponseChunkEvent( - plugin_name="antrhopic", - content_index=delta_event.index, - item_id="", - output_index=0, - sequence_number=0, - delta=delta_event.delta.text, - )) + self.events.send( + LLMResponseChunkEvent( + plugin_name="anthropic", + content_index=delta_event.index, + item_id="", + output_index=0, + sequence_number=0, + delta=delta_event.delta.text, + ) + ) elif event.type == "message_stop": stop_event: RawMessageStopEvent = event total_text = "".join(text_parts) @@ -354,13 +408,15 @@ def _normalize_message(claude_messages: Any) -> List["Message"]: return messages - def _convert_tools_to_provider_format(self, tools: List[ToolSchema]) -> List[Dict[str, Any]]: + def _convert_tools_to_provider_format( + self, tools: List[ToolSchema] + ) -> List[Dict[str, Any]]: """ Convert ToolSchema objects to Anthropic format. - + Args: tools: List of ToolSchema objects - + Returns: List of tools in Anthropic format """ @@ -369,37 +425,42 @@ def _convert_tools_to_provider_format(self, tools: List[ToolSchema]) -> List[Dic anthropic_tool = { "name": tool["name"], "description": tool.get("description", ""), - "input_schema": tool["parameters_schema"] + "input_schema": tool["parameters_schema"], } anthropic_tools.append(anthropic_tool) return anthropic_tools - def _extract_tool_calls_from_response(self, response: Any) -> List[NormalizedToolCallItem]: + def _extract_tool_calls_from_response( + self, response: Any + ) -> List[NormalizedToolCallItem]: """ Extract tool calls from Anthropic response. - + Args: response: Anthropic response object - + Returns: List of normalized tool call items """ tool_calls = [] - - if hasattr(response, 'content') and response.content: + + if hasattr(response, "content") and response.content: for content_block in response.content: - if hasattr(content_block, 'type') and content_block.type == "tool_use": + if hasattr(content_block, "type") and content_block.type == "tool_use": tool_call: NormalizedToolCallItem = { "type": "tool_call", "id": content_block.id, # Critical: capture the id for tool_result "name": content_block.name, - "arguments_json": content_block.input or {} # normalize to arguments_json + "arguments_json": content_block.input + or {}, # normalize to arguments_json } tool_calls.append(tool_call) - + return tool_calls - def _extract_tool_calls_from_stream_chunk(self, chunk: Any, current_tool_call: Optional[NormalizedToolCallItem] = None) -> tuple[List[NormalizedToolCallItem], Optional[NormalizedToolCallItem]]: # type: ignore[override] + def _extract_tool_calls_from_stream_chunk( # type: ignore[override] + self, chunk: Any, current_tool_call: Optional[NormalizedToolCallItem] = None + ) -> tuple[List[NormalizedToolCallItem], Optional[NormalizedToolCallItem]]: """ Extract tool calls from Anthropic streaming chunk using index-keyed accumulation. Args: @@ -409,22 +470,22 @@ def _extract_tool_calls_from_stream_chunk(self, chunk: Any, current_tool_call: O Tuple of (completed tool calls, current tool call being accumulated) """ tool_calls = [] - t = getattr(chunk, 'type', None) + t = getattr(chunk, "type", None) if t == "content_block_start": - cb = getattr(chunk, 'content_block', None) - if getattr(cb, 'type', None) == "tool_use": + cb = getattr(chunk, "content_block", None) + if getattr(cb, "type", None) == "tool_use": if cb is not None: self._pending_tool_uses_by_index[chunk.index] = { "id": cb.id, "name": cb.name, - "parts": [] + "parts": [], } elif t == "content_block_delta": - d = getattr(chunk, 'delta', None) - if getattr(d, 'type', None) == "input_json_delta": - pj = getattr(d, 'partial_json', None) + d = getattr(chunk, "delta", None) + if getattr(d, "type", None) == "input_json_delta": + pj = getattr(d, "partial_json", None) if pj is not None and chunk.index in self._pending_tool_uses_by_index: self._pending_tool_uses_by_index[chunk.index]["parts"].append(pj) @@ -440,12 +501,14 @@ def _extract_tool_calls_from_stream_chunk(self, chunk: Any, current_tool_call: O "type": "tool_call", "id": pending["id"], "name": pending["name"], - "arguments_json": args + "arguments_json": args, } tool_calls.append(tool_call_item) return tool_calls, None - def _create_tool_result_message(self, tool_calls: List[NormalizedToolCallItem], results: List[Any]) -> List[Dict[str, Any]]: + def _create_tool_result_message( + self, tool_calls: List[NormalizedToolCallItem], results: List[Any] + ) -> List[Dict[str, Any]]: """ Create tool result messages for Anthropic. tool_calls: List of tool calls that were executed @@ -461,17 +524,19 @@ def _create_tool_result_message(self, tool_calls: List[NormalizedToolCallItem], payload = str(result) else: payload = json.dumps(result) - blocks.append({ - "type": "tool_result", - "tool_use_id": tool_call["id"], # Critical: must match tool_use.id - "content": payload - }) + blocks.append( + { + "type": "tool_result", + "tool_use_id": tool_call["id"], # Critical: must match tool_use.id + "content": payload, + } + ) return [{"role": "user", "content": blocks}] def _concat_text_blocks(self, content): """Safely extract text from all text blocks in content.""" out = [] for b in content or []: - if getattr(b, 'type', None) == "text" and getattr(b, 'text', None): + if getattr(b, "type", None) == "text" and getattr(b, "text", None): out.append(b.text) return "".join(out) diff --git a/plugins/aws/vision_agents/plugins/aws/aws_llm.py b/plugins/aws/vision_agents/plugins/aws/aws_llm.py index 4bb6cb90..a4e04916 100644 --- a/plugins/aws/vision_agents/plugins/aws/aws_llm.py +++ b/plugins/aws/vision_agents/plugins/aws/aws_llm.py @@ -90,12 +90,12 @@ def _create_client(): await asyncio.to_thread(_create_client) return self._client - async def simple_response( + async def _simple_response( self, text: str, processors: Optional[List[Processor]] = None, participant: Optional[Participant] = None, - ): + ) -> LLMResponseEvent[Any]: """ Simple response is a standardized way to create a response. diff --git a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py index 0c598f34..b5da2b80 100644 --- a/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py +++ b/plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py @@ -75,7 +75,7 @@ def __init__( self._connection_context: Optional[Any] = None self._listen_task: Optional[asyncio.Task[Any]] = None - async def process_audio( + async def _process_audio( self, pcm_data: PcmData, participant: Optional[Participant] = None, @@ -127,20 +127,19 @@ async def start(self): "encoding": "linear16", "sample_rate": "16000", } - + # Add optional parameters if specified if self.eot_threshold is not None: connect_params["eot_threshold"] = str(self.eot_threshold) if self.eager_eot_threshold is not None: connect_params["eager_eot_threshold"] = str(self.eager_eot_threshold) - + # Connect to Deepgram v2 listen WebSocket with timeout self._connection_context = self.client.listen.v2.connect(**connect_params) - + # Add timeout for connection establishment self.connection = await asyncio.wait_for( - self._connection_context.__aenter__(), - timeout=10.0 + self._connection_context.__aenter__(), timeout=10.0 ) # Register event handlers @@ -149,7 +148,7 @@ async def start(self): self.connection.on(EventType.MESSAGE, self._on_message) self.connection.on(EventType.ERROR, self._on_error) self.connection.on(EventType.CLOSE, self._on_close) - + # Start listening for events self._listen_task = asyncio.create_task(self.connection.start_listening()) @@ -159,7 +158,7 @@ async def start(self): def _on_message(self, message): """ Event handler for messages from Deepgram. - + Args: message: The message object from Deepgram """ @@ -189,7 +188,9 @@ def _on_message(self, message): words = getattr(message, "words", []) if words: confidences = [w.confidence for w in words if hasattr(w, "confidence")] - avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 + avg_confidence = ( + sum(confidences) / len(confidences) if confidences else 0.0 + ) else: avg_confidence = 0.0 @@ -207,7 +208,7 @@ def _on_message(self, message): "end_of_turn_confidence": end_of_turn_confidence, "turn_index": getattr(message, "turn_index", None), "event": event, - } + }, ) # Use the participant from the most recent process_audio call @@ -234,7 +235,7 @@ def _on_open(self, message): def _on_error(self, error): """ Event handler for errors from Deepgram. - + Args: error: The error from Deepgram """ diff --git a/plugins/fish/vision_agents/plugins/fish/stt.py b/plugins/fish/vision_agents/plugins/fish/stt.py index a73d009a..cc0e6c8f 100644 --- a/plugins/fish/vision_agents/plugins/fish/stt.py +++ b/plugins/fish/vision_agents/plugins/fish/stt.py @@ -50,7 +50,7 @@ def __init__( self.language = language - async def process_audio( + async def _process_audio( self, pcm_data: PcmData, participant: Optional[Participant] = None, @@ -128,7 +128,7 @@ async def process_audio( # Create a default participant if none provided if participant is None: participant = Participant(original=None, user_id="test-user") - + self._emit_transcript_event(transcript_text, participant, response_metadata) except Exception as e: diff --git a/plugins/gemini/vision_agents/plugins/gemini/gemini_llm.py b/plugins/gemini/vision_agents/plugins/gemini/gemini_llm.py index 78ed27e7..5cb5945c 100644 --- a/plugins/gemini/vision_agents/plugins/gemini/gemini_llm.py +++ b/plugins/gemini/vision_agents/plugins/gemini/gemini_llm.py @@ -37,8 +37,8 @@ class GeminiLLM(LLM): Examples: - from vision_agents.plugins import gemini - llm = gemini.LLM() + from vision_agents.plugins import gemini + llm = gemini.LLM() """ def __init__( @@ -59,13 +59,14 @@ def __init__( self.events.register_events_from_module(events) self.model = model self.chat: Optional[Any] = None + self.provider_name = "gemini" if client is not None: self.client = client else: self.client = Client(api_key=api_key).aio - async def simple_response( + async def _simple_response( self, text: str, processors: Optional[List[Processor]] = None, @@ -84,7 +85,7 @@ async def simple_response( """ return await self.send_message(message=text) - async def send_message(self, *args, **kwargs): + async def send_message(self, *args, **kwargs) -> LLMResponseEvent[Any]: """ send_message gives you full support/access to the native Gemini chat send message method under the hood it calls chat.send_message_stream(*args, **kwargs) @@ -163,7 +164,6 @@ async def send_message(self, *args, **kwargs): sanitized_res = {} for k, v in res.items(): sanitized_res[k] = self._sanitize_tool_output(v) - parts.append( types.Part.from_function_response( name=tc["name"], response=sanitized_res diff --git a/plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py b/plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py index c4e8597f..d0e6e5dc 100644 --- a/plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py +++ b/plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py @@ -133,6 +133,24 @@ async def simple_response( self.logger.info("Simple response called with text: %s", text) await self.send_realtime_input(text=text) + async def _simple_response( + self, + text: str, + processors: Optional[List[Processor]] = None, + participant: Optional[Participant] = None, + ): + """ + Internal simple response implementation required by LLM base class. + + Note: Gemini Realtime is event-driven and doesn't return responses directly. + This implementation sends the text via the public simple_response method. + """ + from vision_agents.core.llm.llm import LLMResponseEvent + + await self.simple_response(text, processors, participant) + # Return empty LLMResponseEvent since Realtime API is event-driven + return LLMResponseEvent(original=None, text="") + async def simple_audio_response( self, pcm: PcmData, participant: Optional[Participant] = None ): diff --git a/plugins/krisp/vision_agents/plugins/krisp/turn_detection.py b/plugins/krisp/vision_agents/plugins/krisp/turn_detection.py index b62d15e5..6a1d2d1d 100644 --- a/plugins/krisp/vision_agents/plugins/krisp/turn_detection.py +++ b/plugins/krisp/vision_agents/plugins/krisp/turn_detection.py @@ -75,7 +75,7 @@ def is_detecting(self) -> bool: """Check if turn detection is currently active.""" return self._is_detecting - async def process_audio( + async def detect_turn( self, audio_data: PcmData, participant: Participant, diff --git a/plugins/openai/vision_agents/plugins/openai/openai_llm.py b/plugins/openai/vision_agents/plugins/openai/openai_llm.py index 06a19940..c2fbe66d 100644 --- a/plugins/openai/vision_agents/plugins/openai/openai_llm.py +++ b/plugins/openai/vision_agents/plugins/openai/openai_llm.py @@ -69,6 +69,7 @@ def __init__( self.model = model self.openai_conversation: Optional[Any] = None self.conversation = None + self.provider_name = "openai" if client is not None: self.client = client @@ -77,7 +78,7 @@ def __init__( else: self.client = AsyncOpenAI(base_url=base_url) - async def simple_response( + async def _simple_response( self, text: str, processors: Optional[List[Processor]] = None, diff --git a/plugins/openai/vision_agents/plugins/openai/openai_realtime.py b/plugins/openai/vision_agents/plugins/openai/openai_realtime.py index de93504f..d0fd53ba 100644 --- a/plugins/openai/vision_agents/plugins/openai/openai_realtime.py +++ b/plugins/openai/vision_agents/plugins/openai/openai_realtime.py @@ -126,6 +126,24 @@ async def simple_response( """ await self.rtc.send_text(text) + async def _simple_response( + self, + text: str, + processors: Optional[List[Processor]] = None, + participant: Optional[Participant] = None, + ): + """ + Internal simple response implementation required by LLM base class. + + Note: OpenAI Realtime is event-driven and doesn't return responses directly. + This implementation sends the text via the public simple_response method. + """ + from vision_agents.core.llm.llm import LLMResponseEvent + + await self.simple_response(text, processors, participant) + # Return empty LLMResponseEvent since Realtime API is event-driven + return LLMResponseEvent(original=None, text="") + async def simple_audio_response( self, audio: PcmData, participant: Optional[Participant] = None ): diff --git a/plugins/openrouter/vision_agents/plugins/openrouter/openrouter_llm.py b/plugins/openrouter/vision_agents/plugins/openrouter/openrouter_llm.py index 52664191..730f968e 100644 --- a/plugins/openrouter/vision_agents/plugins/openrouter/openrouter_llm.py +++ b/plugins/openrouter/vision_agents/plugins/openrouter/openrouter_llm.py @@ -1,4 +1,5 @@ """OpenRouter LLM implementation using OpenAI-compatible API.""" + import os from typing import Any @@ -24,7 +25,7 @@ def __init__( **kwargs: Any, ) -> None: """Initialize OpenRouter LLM. - + Args: api_key: OpenRouter API key. If not provided, uses OPENROUTER_API_KEY env var. base_url: OpenRouter API base URL. @@ -39,6 +40,7 @@ def __init__( model=model, **kwargs, ) + self.provider_name = "openrouter" async def create_conversation(self): # Do nothing, dont call super @@ -51,11 +53,10 @@ def add_conversation_history(self, kwargs): new_messages = kwargs["input"] if not isinstance(new_messages, list): new_messages = [dict(content=new_messages, role="user", type="message")] - if hasattr(self, '_conversation') and self._conversation: + if hasattr(self, "_conversation") and self._conversation: old_messages = [m.original for m in self._conversation.messages] kwargs["input"] = old_messages + new_messages # Add messages to conversation normalized_messages = self._normalize_message(new_messages) for msg in normalized_messages: self._conversation.messages.append(msg) - diff --git a/plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py b/plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py index 08684d32..498f1c5f 100644 --- a/plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py +++ b/plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py @@ -12,6 +12,11 @@ from vision_agents.core.agents import Conversation from vision_agents.core.agents.agents import default_agent_options, AgentOptions from vision_agents.core.edge.types import Participant +from vision_agents.core.observability.metrics import ( + Timer, + turn_vad_latency_ms, + turn_end_detection_latency_ms, +) from vision_agents.core.turn_detection import ( TurnDetector, @@ -109,7 +114,9 @@ def __init__( self._audio_queue: asyncio.Queue[Any] = asyncio.Queue() self._processing_task: Optional[asyncio.Task[Any]] = None self._shutdown_event = asyncio.Event() - self._processing_active = asyncio.Event() # Tracks if background task is processing + self._processing_active = ( + asyncio.Event() + ) # Tracks if background task is processing if options is None: self.options = default_agent_options() @@ -149,7 +156,7 @@ async def _prepare_silero_vad(self): SileroVAD, path, reset_interval_seconds=self.vad_reset_interval_seconds ) - async def process_audio( + async def detect_turn( self, audio_data: PcmData, participant: Participant, @@ -168,29 +175,38 @@ async def _process_audio_loop(self): Background task that continuously processes audio from the queue. This is where the actual VAD and turn detection logic runs. """ - while not self._shutdown_event.is_set(): - try: - # Wait for audio packet with timeout to allow shutdown - audio_data, participant, conversation = await asyncio.wait_for( - self._audio_queue.get(), timeout=1.0 - ) - - # Signal that we're actively processing - self._processing_active.set() - + try: + while not self._shutdown_event.is_set(): try: - # Process the audio packet - await self._process_audio_packet(audio_data, participant) - finally: - # If queue is empty, clear the processing flag - if self._audio_queue.empty(): - self._processing_active.clear() - - except asyncio.TimeoutError: - # Timeout is expected - continue loop to check shutdown - continue - except Exception as e: - logger.error(f"Error processing audio: {e}") + # Wait for audio packet with timeout to allow shutdown + audio_data, participant, conversation = await asyncio.wait_for( + self._audio_queue.get(), timeout=1.0 + ) + + # Signal that we're actively processing + self._processing_active.set() + + try: + # Process the audio packet + await self._process_audio_packet(audio_data, participant) + finally: + # If queue is empty, clear the processing flag + if self._audio_queue.empty(): + self._processing_active.clear() + + except asyncio.TimeoutError: + # Timeout is expected - continue loop to check shutdown + continue + except Exception as e: + logger.error(f"Error processing audio: {e}") + except asyncio.CancelledError: + # Task was cancelled - ensure clean shutdown + logger.debug("Audio processing loop cancelled") + raise + finally: + # Always clear flags on shutdown to allow proper lifecycle transitions + self._processing_active.clear() + self._shutdown_event.clear() async def _process_audio_packet( self, @@ -234,7 +250,10 @@ async def _process_audio_packet( # detect speech in small 512 chunks, gather to larger audio segments with speech for chunk in audio_chunks[:-1]: # predict if this segment has speech - speech_probability = await self.vad.predict_speech(chunk.samples) + with Timer(turn_vad_latency_ms) as timer: + timer.attributes["samples"] = len(chunk.samples) + timer.attributes["implementation"] = "smart_turn" + speech_probability = await self.vad.predict_speech(chunk.samples) is_speech = speech_probability > self.speech_probability_threshold if self._active_segment is not None: @@ -252,7 +271,11 @@ async def _process_audio_packet( # TODO: make this testable trailing_silence_ms = ( - self._silence.trailing_silence_chunks * 512 / 16000 * 1000 * 5 #DTX correction + self._silence.trailing_silence_chunks + * 512 + / 16000 + * 1000 + * 5 # DTX correction ) long_silence = trailing_silence_ms > self._trailing_silence_ms max_duration_reached = ( @@ -269,7 +292,16 @@ async def _process_audio_packet( merged.append(self._active_segment) merged = merged.tail(8, True, "start") # see if we've completed the turn - prediction = await self._predict_turn_completed(merged, participant) + with Timer(turn_end_detection_latency_ms) as timer: + timer.attributes["implementation"] = "smart_turn" + timer.attributes["audio_duration_ms"] = merged.duration_ms + timer.attributes["samples"] = len(merged.samples) + timer.attributes["trailing_silence_ms"] = trailing_silence_ms + prediction = await self._predict_turn_completed( + merged, participant + ) + timer.attributes["prediction"] = prediction + timer.attributes["turn_ended"] = prediction > 0.5 turn_ended = prediction > 0.5 if turn_ended: self._emit_end_turn_event( @@ -304,19 +336,19 @@ async def _process_audio_packet( async def wait_for_processing_complete(self, timeout: float = 5.0) -> None: """Wait for all queued audio to be processed. Useful for testing.""" start_time = time.time() - + # Wait for queue to be empty AND no active processing while (time.time() - start_time) < timeout: queue_empty = self._audio_queue.qsize() == 0 not_processing = not self._processing_active.is_set() - + if queue_empty and not_processing: # Give a small final buffer to ensure events are emitted await asyncio.sleep(0.05) return - + await asyncio.sleep(0.01) - + # Timeout reached logger.warning(f"wait_for_processing_complete timed out after {timeout}s") @@ -380,16 +412,16 @@ def _blocking_predict_turn_completed( def _build_smart_turn_session(self): path = os.path.join(self.options.model_dir, SMART_TURN_ONNX_FILENAME) - + # Load model into memory to avoid multi-worker file access issues with open(path, "rb") as f: model_bytes = f.read() - + so = ort.SessionOptions() so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL so.inter_op_num_threads = 1 so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL - + # Load from memory instead of file path return ort.InferenceSession(model_bytes, sess_options=so) @@ -397,7 +429,12 @@ def _build_smart_turn_session(self): class SileroVAD: """Minimal Silero VAD ONNX wrapper for 16 kHz, mono, chunk=512.""" - def __init__(self, model_path: str, model_bytes: Optional[bytes] = None, reset_interval_seconds: float = 5.0): + def __init__( + self, + model_path: str, + model_bytes: Optional[bytes] = None, + reset_interval_seconds: float = 5.0, + ): """ Initialize Silero VAD. @@ -410,10 +447,10 @@ def __init__(self, model_path: str, model_bytes: Optional[bytes] = None, reset_i if model_bytes is None: with open(model_path, "rb") as f: model_bytes = f.read() - + opts = ort.SessionOptions() opts.inter_op_num_threads = 1 - + # Load from memory instead of file path self.session = ort.InferenceSession(model_bytes, sess_options=opts) self.context_size = 64 # Silero uses 64-sample context at 16 kHz diff --git a/plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py b/plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py index 31b0d656..4e342011 100644 --- a/plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py +++ b/plugins/ultralytics/vision_agents/plugins/ultralytics/yolo_pose_processor.py @@ -18,14 +18,15 @@ AudioVideoProcessor, ) from vision_agents.core.utils.video_forwarder import VideoForwarder +from vision_agents.core.observability.metrics import Timer, meter from vision_agents.core.utils.video_track import QueuedVideoTrack logger = logging.getLogger(__name__) -DEFAULT_WIDTH = 640 -DEFAULT_HEIGHT = 480 -DEFAULT_WIDTH = 1920 -DEFAULT_HEIGHT = 1080 +# Metrics for YOLO pose detection +yolo_pose_inference_ms = meter.create_histogram( + "yolo.pose.inference.ms", unit="ms", description="YOLO pose inference latency" +) """ TODO: video track & Queuing need more testing/ thought @@ -49,6 +50,7 @@ class YOLOPoseVideoTrack(QueuedVideoTrack): pass + class YOLOPoseProcessor(AudioVideoProcessor, VideoProcessorMixin, VideoPublisherMixin): """ Yolo pose detection processor. @@ -248,16 +250,28 @@ def _process_pose_sync( ) # Run pose detection - yolo_start = time.perf_counter() - pose_results = self.pose_model( - frame_array, - verbose=False, - # imgsz=self.imgsz, - conf=self.conf_threshold, - device=self.device, + with Timer(yolo_pose_inference_ms) as timer: + timer.attributes["frame_width"] = frame_array.shape[1] + timer.attributes["frame_height"] = frame_array.shape[0] + timer.attributes["conf_threshold"] = self.conf_threshold + timer.attributes["device"] = str(self.device) + + pose_results = self.pose_model( + frame_array, + verbose=False, + # imgsz=self.imgsz, + conf=self.conf_threshold, + device=self.device, + ) + + # Add detected person count to metrics + timer.attributes["persons_detected"] = ( + len(pose_results) if pose_results else 0 + ) + + logger.debug( + f"🎯 YOLO inference completed in {timer.last_elapsed_ms:.1f}ms" ) - yolo_time = time.perf_counter() - yolo_start - logger.debug(f"🎯 YOLO inference completed in {yolo_time:.3f}s") if not pose_results: logger.debug("❌ No pose results detected") diff --git a/plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py b/plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py index c524b8d9..e74db1ec 100644 --- a/plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py +++ b/plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py @@ -16,12 +16,25 @@ TurnStartedEvent, TurnEndedEvent, ) +from vision_agents.core.observability.metrics import ( + Timer, + meter, + turn_vad_latency_ms, + turn_end_detection_latency_ms, +) from vision_agents.core.utils.utils import ensure_model import logging logger = logging.getLogger(__name__) +# Vogent-specific metric for Whisper transcription +vogent_whisper_latency_ms = meter.create_histogram( + "vogent.whisper.latency.ms", + unit="ms", + description="Vogent Whisper transcription latency", +) + # Silero VAD model (reused from smart_turn) SILERO_ONNX_FILENAME = "silero_vad.onnx" SILERO_ONNX_URL = "https://github.com/snakers4/silero-vad/raw/master/src/silero_vad/data/silero_vad.onnx" @@ -40,15 +53,15 @@ class Silence: class VogentTurnDetection(TurnDetector): """ Vogent Turn Detection combines audio intonation and text context for accurate turn detection. - + This implementation: 1. Uses Silero VAD to detect when speech starts/stops 2. Uses faster-whisper to transcribe audio in real-time 3. Uses Vogent Turn model (multimodal) to detect turn completion - + Vogent operates on both audio features AND text context, making it more accurate than audio-only approaches, especially for handling incomplete thoughts. - + Reference: https://github.com/vogent/vogent-turn Blogpost: https://blog.vogent.ai/posts/voturn-80m-state-of-the-art-turn-detection-for-voice-agents """ @@ -66,7 +79,7 @@ def __init__( ): """ Initialize Vogent Turn Detection. - + Args: whisper_model_size: Faster-whisper model size (tiny, base, small, medium, large) vad_reset_interval_seconds: Reset VAD internal state every N seconds to prevent drift @@ -78,7 +91,7 @@ def __init__( model_dir: Directory to store model files """ super().__init__() - + # Configuration parameters self.whisper_model_size = whisper_model_size self.vad_reset_interval_seconds = vad_reset_interval_seconds @@ -88,7 +101,7 @@ def __init__( self.max_segment_duration_seconds = max_segment_duration_seconds self.vogent_threshold = vogent_threshold self.model_dir = model_dir - + # Audio buffering for processing self._audio_buffer = PcmData( sample_rate=RATE, channels=1, format=AudioFormat.F32 @@ -99,12 +112,12 @@ def __init__( ) self._active_segment: Optional[PcmData] = None self._trailing_silence_ms = self.silence_duration_ms - + # Producer-consumer pattern: audio packets go into buffer, background task processes them self._audio_queue: asyncio.Queue[Any] = asyncio.Queue() self._processing_task: Optional[asyncio.Task[Any]] = None self._shutdown_event = asyncio.Event() - + # Model instances (initialized in start()) self.vad = None self.whisper = None @@ -114,17 +127,17 @@ async def start(self): """Initialize models and prepare for turn detection.""" # Ensure model directory exists os.makedirs(self.model_dir, exist_ok=True) - + # Prepare models in parallel await asyncio.gather( self._prepare_silero_vad(), self._prepare_whisper(), self._prepare_vogent(), ) - + # Start background processing task self._processing_task = asyncio.create_task(self._process_audio_loop()) - + # Call parent start method await super().start() @@ -133,8 +146,10 @@ async def _prepare_silero_vad(self) -> None: path = os.path.join(self.model_dir, SILERO_ONNX_FILENAME) await ensure_model(path, SILERO_ONNX_URL) # Initialize VAD in thread pool to avoid blocking event loop - self.vad = await asyncio.to_thread( - lambda: SileroVAD(path, reset_interval_seconds=self.vad_reset_interval_seconds) # type: ignore + self.vad = await asyncio.to_thread( # type: ignore[func-returns-value] + lambda: SileroVAD( # type: ignore[arg-type] + path, reset_interval_seconds=self.vad_reset_interval_seconds + ) ) async def _prepare_whisper(self) -> None: @@ -142,7 +157,9 @@ async def _prepare_whisper(self) -> None: logger.info(f"Loading faster-whisper model: {self.whisper_model_size}") # Load whisper in thread pool to avoid blocking event loop self.whisper = await asyncio.to_thread( # type: ignore[func-returns-value] - lambda: WhisperModel(self.whisper_model_size, device="cpu", compute_type="int8") + lambda: WhisperModel( + self.whisper_model_size, device="cpu", compute_type="int8" + ) ) logger.info("Faster-whisper model loaded") @@ -162,7 +179,7 @@ async def _prepare_vogent(self) -> None: ) logger.info("Vogent turn detection model loaded") - async def process_audio( + async def detect_turn( self, audio_data: PcmData, participant: Participant, @@ -204,7 +221,7 @@ async def _process_audio_packet( ) -> None: """ Process audio packet through VAD -> Whisper -> Vogent pipeline. - + This method: 1. Buffers audio and processes in 512-sample chunks 2. Uses VAD to detect speech @@ -212,7 +229,7 @@ async def _process_audio_packet( 4. When reaching silence or max duration: - Transcribes segment with Whisper - Checks turn completion with Vogent (audio + text) - + Args: audio_data: PcmData object containing audio samples participant: Participant that's speaking @@ -239,8 +256,11 @@ async def _process_audio_packet( # Predict if this segment has speech if self.vad is None: continue - - speech_probability = self.vad.predict_speech(chunk.samples) + + with Timer(turn_vad_latency_ms) as timer: + timer.attributes["samples"] = len(chunk.samples) + timer.attributes["implementation"] = "vogent" + speech_probability = self.vad.predict_speech(chunk.samples) is_speech = speech_probability > self.speech_probability_threshold if self._active_segment is not None: @@ -256,7 +276,11 @@ async def _process_audio_packet( self._silence.trailing_silence_chunks += 1 trailing_silence_ms = ( - self._silence.trailing_silence_chunks * CHUNK / RATE * 1000 * 5 # DTX correction + self._silence.trailing_silence_chunks + * CHUNK + / RATE + * 1000 + * 5 # DTX correction ) long_silence = trailing_silence_ms > self._trailing_silence_ms max_duration_reached = ( @@ -272,20 +296,20 @@ async def _process_audio_packet( merged.append(self._pre_speech_buffer) merged.append(self._active_segment) merged = merged.tail(8, True, "start") - + # Transcribe the segment with Whisper transcription = await self._transcribe_segment(merged) - + # Get previous line from conversation for context prev_line = self._get_previous_line(conversation) - + # Check if turn is complete using Vogent (multimodal: audio + text) is_complete = await self._predict_turn_completed( merged, prev_line=prev_line, curr_line=transcription, ) - + if is_complete: self._emit_end_turn_event( TurnEndedEvent( @@ -303,7 +327,7 @@ async def _process_audio_packet( ) self._pre_speech_buffer.append(merged) self._pre_speech_buffer = self._pre_speech_buffer.tail(8) - + elif is_speech and self._active_segment is None: self._emit_start_turn_event(TurnStartedEvent(participant=participant)) # Create a new segment @@ -342,103 +366,116 @@ async def stop(self): async def _transcribe_segment(self, pcm: PcmData) -> str: """ Transcribe audio segment using faster-whisper. - + Args: pcm: PcmData containing audio samples - + Returns: Transcribed text """ - # Ensure it's 16khz and f32 format - pcm = pcm.resample(16000).to_float32() - audio_array = pcm.samples - - if self.whisper is None: - return "" - - # Run transcription in thread pool to avoid blocking - segments, info = await asyncio.to_thread( - self.whisper.transcribe, - audio_array, - language="en", - beam_size=1, - vad_filter=False, # We already did VAD - ) - - # Collect all text segments - text_parts = [] - for segment in segments: - text_parts.append(segment.text.strip()) - - transcription = " ".join(text_parts).strip() + with Timer(vogent_whisper_latency_ms) as timer: + # Ensure it's 16khz and f32 format + pcm = pcm.resample(16000).to_float32() + audio_array = pcm.samples + timer.attributes["audio_duration_ms"] = pcm.duration_ms + timer.attributes["samples"] = len(audio_array) + + if self.whisper is None: + return "" + + # Run transcription in thread pool to avoid blocking + segments, info = await asyncio.to_thread( + self.whisper.transcribe, + audio_array, + language="en", + beam_size=1, + vad_filter=False, # We already did VAD + ) + + # Collect all text segments + text_parts = [] + for segment in segments: + text_parts.append(segment.text.strip()) + + transcription = " ".join(text_parts).strip() + timer.attributes["transcription_length"] = len(transcription) + return transcription async def _predict_turn_completed( - self, - pcm: PcmData, + self, + pcm: PcmData, prev_line: str, curr_line: str, ) -> bool: """ Predict whether the current turn is complete using Vogent. - + Args: pcm: PcmData containing audio samples prev_line: Previous speaker's text (for context) curr_line: Current speaker's text - + Returns: True if turn is complete, False otherwise """ - # Ensure it's 16khz and f32 format - pcm = pcm.resample(16000).to_float32() - - # Truncate to 8 seconds - audio_array = pcm.tail(8, False).samples - - if self.vogent is None: - return False - - # Run vogent prediction in thread pool - result = await asyncio.to_thread( - self.vogent.predict, - audio_array, - prev_line=prev_line, - curr_line=curr_line, - sample_rate=16000, - return_probs=True, - ) - - # Check if probability exceeds threshold - is_complete = result['prob_endpoint'] > self.vogent_threshold - logger.debug( - f"Vogent probability: {result['prob_endpoint']:.3f}, " - f"threshold: {self.vogent_threshold}, is_complete: {is_complete}" - ) - + with Timer(turn_end_detection_latency_ms) as timer: + # Ensure it's 16khz and f32 format + pcm = pcm.resample(16000).to_float32() + + # Truncate to 8 seconds + audio_array = pcm.tail(8, False).samples + timer.attributes["implementation"] = "vogent" + timer.attributes["audio_duration_ms"] = len(audio_array) / 16000 * 1000 + timer.attributes["prev_line_length"] = len(prev_line) + timer.attributes["curr_line_length"] = len(curr_line) + + if self.vogent is None: + return False + + # Run vogent prediction in thread pool + result = await asyncio.to_thread( + self.vogent.predict, + audio_array, + prev_line=prev_line, + curr_line=curr_line, + sample_rate=16000, + return_probs=True, + ) + + # Check if probability exceeds threshold + is_complete = result["prob_endpoint"] > self.vogent_threshold + timer.attributes["probability"] = result["prob_endpoint"] + timer.attributes["is_complete"] = is_complete + + logger.debug( + f"Vogent probability: {result['prob_endpoint']:.3f}, " + f"threshold: {self.vogent_threshold}, is_complete: {is_complete}" + ) + return is_complete def _get_previous_line(self, conversation: Optional[Conversation]) -> str: """ Extract the previous speaker's line from conversation history. - + Args: conversation: Conversation object with message history - + Returns: Previous line text, or empty string if not available """ if conversation is None or not conversation.messages: return "" - + # Get the last message that's not from the current speaker # Typically this would be the assistant or another user for message in reversed(conversation.messages): if message.content and message.content.strip(): # Remove terminal punctuation for better vogent performance - text = message.content.strip().rstrip('.!?') + text = message.content.strip().rstrip(".!?") return text - + return "" @@ -446,20 +483,20 @@ def _get_previous_line(self, conversation: Optional[Conversation]) -> str: class SileroVAD: """ Minimal Silero VAD ONNX wrapper for 16 kHz, mono, chunk=512. - + Reused from smart_turn implementation. """ def __init__(self, model_path: str, reset_interval_seconds: float = 5.0): """ Initialize Silero VAD. - + Args: model_path: Path to the ONNX model file reset_interval_seconds: Reset internal state every N seconds to prevent drift """ import onnxruntime as ort - + opts = ort.SessionOptions() opts.inter_op_num_threads = 1 self.session = ort.InferenceSession(model_path, sess_options=opts) @@ -507,5 +544,3 @@ def predict_speech(self, chunk_f32: np.ndarray) -> float: # out shape is (1, 1) -> return scalar return float(out[0][0]) - - diff --git a/plugins/wizper/vision_agents/plugins/wizper/stt.py b/plugins/wizper/vision_agents/plugins/wizper/stt.py index ba65b717..fcea5584 100644 --- a/plugins/wizper/vision_agents/plugins/wizper/stt.py +++ b/plugins/wizper/vision_agents/plugins/wizper/stt.py @@ -59,7 +59,7 @@ def __init__( self.target_language = target_language self._fal_client = client if client is not None else fal_client.AsyncClient() - async def process_audio( + async def _process_audio( self, pcm_data: PcmData, participant: Optional["Participant"] = None, @@ -89,8 +89,7 @@ async def process_audio( # Create temporary file for upload (async to avoid blocking) temp_file_path = os.path.join( - tempfile.gettempdir(), - f"wizper_{os.getpid()}_{id(pcm_data)}.wav" + tempfile.gettempdir(), f"wizper_{os.getpid()}_{id(pcm_data)}.wav" ) async with aiofiles.open(temp_file_path, "wb") as f: await f.write(wav_data) @@ -119,8 +118,11 @@ async def process_audio( # Create a default participant if none provided if participant is None: from vision_agents.core.edge.types import Participant - participant = Participant(original=None, user_id="test-user") - + + participant = Participant( + original=None, user_id="test-user" + ) + response_metadata = TranscriptResponse() self._emit_transcript_event( text, participant, response_metadata diff --git a/plugins/xai/vision_agents/plugins/xai/llm.py b/plugins/xai/vision_agents/plugins/xai/llm.py index 5392ab64..4ed18bf0 100644 --- a/plugins/xai/vision_agents/plugins/xai/llm.py +++ b/plugins/xai/vision_agents/plugins/xai/llm.py @@ -5,7 +5,10 @@ from vision_agents.core.llm.llm import LLM, LLMResponseEvent from vision_agents.core.processors import Processor -from vision_agents.core.llm.events import LLMResponseChunkEvent, LLMResponseCompletedEvent +from vision_agents.core.llm.events import ( + LLMResponseChunkEvent, + LLMResponseCompletedEvent, +) from . import events if TYPE_CHECKING: @@ -56,6 +59,7 @@ def __init__( self.model = model self.xai_chat: Optional["Chat"] = None self.conversation = None + self.provider_name = "xai" if client is not None: self.client = client @@ -64,7 +68,7 @@ def __init__( else: self.client = AsyncClient() - async def simple_response( + async def _simple_response( self, text: str, processors: Optional[List[Processor]] = None, @@ -91,7 +95,9 @@ async def simple_response( instructions=instructions, ) - async def create_response(self, *args: Any, **kwargs: Any) -> LLMResponseEvent[Response]: + async def create_response( + self, *args: Any, **kwargs: Any + ) -> LLMResponseEvent[Response]: """ create_response gives you full support/access to the native xAI chat.sample() and chat.stream() methods this method wraps the xAI method and ensures we broadcast an event which the agent class hooks into @@ -139,10 +145,11 @@ async def create_response(self, *args: Any, **kwargs: Any) -> LLMResponseEvent[R self.xai_chat.append(response) if llm_response is not None: - self.events.send(LLMResponseCompletedEvent( - original=llm_response.original, - text=llm_response.text - )) + self.events.send( + LLMResponseCompletedEvent( + original=llm_response.original, text=llm_response.text + ) + ) return llm_response or LLMResponseEvent[Response]( Response(chat_pb2.GetChatCompletionResponse(), 0), "" @@ -170,31 +177,32 @@ def _standardize_and_emit_chunk( Forwards the chunk events and also send out a standardized version (the agent class hooks into that) """ # Emit the raw chunk event - self.events.send(events.XAIChunkEvent( - plugin_name="xai", - chunk=chunk - )) + self.events.send(events.XAIChunkEvent(plugin_name="xai", chunk=chunk)) # Emit standardized delta events for content if chunk.content: - self.events.send(LLMResponseChunkEvent( - content_index=0, # xAI doesn't have content_index - item_id=chunk.proto.id if hasattr(chunk.proto, "id") else "", - output_index=0, # xAI doesn't have output_index - sequence_number=0, # xAI doesn't have sequence_number - delta=chunk.content, - plugin_name="xai", - )) + self.events.send( + LLMResponseChunkEvent( + content_index=0, # xAI doesn't have content_index + item_id=chunk.proto.id if hasattr(chunk.proto, "id") else "", + output_index=0, # xAI doesn't have output_index + sequence_number=0, # xAI doesn't have sequence_number + delta=chunk.content, + plugin_name="xai", + ) + ) # Check if this is the final chunk (finish_reason indicates completion) if chunk.choices and chunk.choices[0].finish_reason: # This is the final chunk, return the complete response llm_response = LLMResponseEvent[Response](response, response.content) - self.events.send(LLMResponseCompletedEvent( - plugin_name="xai", - text=llm_response.text, - original=llm_response.original - )) + self.events.send( + LLMResponseCompletedEvent( + plugin_name="xai", + text=llm_response.text, + original=llm_response.original, + ) + ) return llm_response return None diff --git a/tests/test_function_calling.py b/tests/test_function_calling.py index c8418f66..25a022c8 100644 --- a/tests/test_function_calling.py +++ b/tests/test_function_calling.py @@ -1,12 +1,15 @@ """ Tests for function calling functionality. """ +from typing import Optional, List, Any import pytest from unittest.mock import Mock, patch +from vision_agents.core.edge.types import Participant from vision_agents.core.llm import FunctionRegistry, function_registry -from vision_agents.core.llm.llm import LLM +from vision_agents.core.llm.llm import LLM, LLMResponseEvent +from vision_agents.core.processors import Processor from vision_agents.plugins.openai import LLM as OpenAILLM from vision_agents.plugins.anthropic import LLM as ClaudeLLM from vision_agents.plugins.gemini import LLM as GeminiLLM @@ -125,12 +128,21 @@ def global_test_func(x: int) -> int: assert result == 12 +class TestLLM(LLM): + async def _simple_response( + self, + text: str, + processors: Optional[List[Processor]] = None, + participant: Optional[Participant] = None, + ) -> LLMResponseEvent[Any]: + return LLMResponseEvent(original=dict(), text="") + class TestLLMFunctionCalling: """Test LLM function calling functionality.""" async def test_llm_function_registration(self): """Test that LLM can register functions.""" - llm = LLM() + llm = TestLLM() @llm.register_function(description="Test function") def test_func(x: int) -> int: @@ -143,7 +155,7 @@ def test_func(x: int) -> int: async def test_llm_get_available_functions(self): """Test getting available functions from LLM.""" - llm = LLM() + llm = TestLLM() @llm.register_function(description="Function 1") def func1(x: int) -> int: @@ -417,7 +429,7 @@ class TestFunctionCallingIntegration: async def test_tool_call_processing(self): """Test processing tool calls with multiple functions.""" - llm = LLM() + llm = TestLLM() @llm.register_function(description="Get weather") def get_weather(location: str) -> str: @@ -440,7 +452,7 @@ def calculate_sum(a: int, b: int) -> int: async def test_error_handling_in_function_calls(self): """Test error handling in function calls.""" - llm = LLM() + llm = TestLLM() @llm.register_function(description="Test function that raises error") def error_function(x: int) -> int: @@ -458,7 +470,7 @@ def error_function(x: int) -> int: async def test_function_schema_generation(self): """Test that function schemas are generated correctly.""" - llm = LLM() + llm = TestLLM() @llm.register_function(description="Complex function") def complex_function( @@ -500,7 +512,7 @@ class TestConcurrentToolExecution: async def test_dedup_and_execute(self): """Test the _dedup_and_execute method.""" - llm = LLM() + llm = TestLLM() @llm.register_function(description="Test function") def test_func(x: int) -> int: @@ -533,7 +545,7 @@ async def test_tool_lifecycle_events(self): """Test that tool lifecycle events are emitted.""" from vision_agents.core.llm.events import ToolStartEvent, ToolEndEvent - llm = LLM() + llm = TestLLM() @llm.register_function(description="Test function") def test_func(x: int) -> int: @@ -567,7 +579,7 @@ async def track_end_event(event: ToolEndEvent): async def test_output_sanitization(self): """Test output sanitization for large responses.""" - llm = LLM() + llm = TestLLM() # Test normal output normal_output = "Hello world"