diff --git a/livekit-plugins/livekit-plugins-rtzr/README.md b/livekit-plugins/livekit-plugins-rtzr/README.md new file mode 100644 index 0000000000..3510b0c781 --- /dev/null +++ b/livekit-plugins/livekit-plugins-rtzr/README.md @@ -0,0 +1,55 @@ +# RTZR plugin for LiveKit Agents + +Support for RTZR Streaming STT via WebSocket interface, following the "Streaming STT" guide in the RTZR Developers docs. + +- Docs: `https://developers.rtzr.ai/docs/en/` (see Streaming STT) + +## Installation + +```bash +pip install livekit-plugins-rtzr +``` + +## Prerequisites + +Obtain `client_id` and `client_secret` from the RTZR Developers Console. + +Set credentials as environment variables: + +``` +RTZR_CLIENT_ID= +RTZR_CLIENT_SECRET= +``` + +``` +# Override base HTTP API URL (used for token issuance) +RTZR_API_BASE=https://openapi.vito.ai + +# Override WebSocket URL (used for live streaming) +RTZR_WEBSOCKET_URL=wss://openapi.vito.ai +``` + +If `RTZR_WEBSOCKET_URL` is not set, the plugin will derive it from `RTZR_API_BASE` by replacing the scheme with `wss://`. + +## Usage + +Use RTZR STT in an `AgentSession` or as a standalone streaming service. + +```python +from livekit.agents import AgentSession +from livekit.plugins import rtzr + +# Basic usage with env-based credentials +stt = rtzr.STT() + +session = AgentSession( + stt=stt, + # ... llm, tts, etc. +) +``` + +Notes: +- The WebSocket streaming endpoint accepts raw PCM frames when `encoding=LINEAR16`. +- The plugin relies on the server-side endpointing (EPD). You do not need to send finalize messages. +- When the pipeline closes the stream, the plugin sends `EOS` to end the session. + diff --git a/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/__init__.py b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/__init__.py new file mode 100644 index 0000000000..6ba4a520f6 --- /dev/null +++ b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/__init__.py @@ -0,0 +1,32 @@ +"""RTZR plugin for LiveKit Agents + +See Streaming STT docs at: https://developers.rtzr.ai/docs/en/ + +Environment variables used: +- `RTZR_CLIENT_ID` / `RTZR_CLIENT_SECRET` for authentication (required) +""" + +from livekit.agents import Plugin + +from .log import logger +from .stt import STT +from .version import __version__ + +__all__ = ["STT", "__version__"] + + +class RTZRPlugin(Plugin): + def __init__(self): + super().__init__(__name__, __version__, __package__, logger) + + +Plugin.register_plugin(RTZRPlugin()) + +# Cleanup docs of unexported modules +_module = dir() +NOT_IN_ALL = [m for m in _module if m not in __all__] + +__pdoc__ = {} + +for n in NOT_IN_ALL: + __pdoc__[n] = False diff --git a/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/log.py b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/log.py new file mode 100644 index 0000000000..84f3078734 --- /dev/null +++ b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/log.py @@ -0,0 +1,3 @@ +import logging + +logger = logging.getLogger("livekit.plugins.rtzr") diff --git a/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py new file mode 100644 index 0000000000..bc884a9a19 --- /dev/null +++ b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/rtzrapi.py @@ -0,0 +1,183 @@ +from __future__ import annotations + +import logging +import os +import time +from typing import Any + +import aiohttp + +logger = logging.getLogger(__name__) + + +class RTZRAPIError(Exception): + """Base exception for RTZR API errors.""" + + pass + + +class RTZRConnectionError(RTZRAPIError): + """Exception raised when connection to RTZR API fails.""" + + pass + + +class RTZRStatusError(RTZRAPIError): + """Exception raised when RTZR API returns an error status.""" + + def __init__(self, message: str, status_code: int | None = None): + super().__init__(message) + self.message = message + self.status_code = status_code + + +class RTZRTimeoutError(RTZRAPIError): + """Exception raised when RTZR API request times out.""" + + pass + + +DEFAULT_SAMPLE_RATE = 8000 + + +class RTZROpenAPIClient: + """RTZR OpenAPI Client for authentication and WebSocket streaming. + + This is an independent SDK client that can be used without livekit dependencies. + It supports both manual session management and async context manager usage. + + Example: + # Manual session management + client = RTZROpenAPIClient() + token = await client.get_token() + ws = await client.connect_websocket(config) + await client.close() + + # Context manager (recommended) + async with RTZROpenAPIClient() as client: + token = await client.get_token() + ws = await client.connect_websocket(config) + """ + + def __init__( + self, + client_id: str | None = None, + client_secret: str | None = None, + http_session: aiohttp.ClientSession | None = None, + ) -> None: + self._logger = logging.getLogger(__name__) + self.client_id = client_id or os.environ.get("RTZR_CLIENT_ID") + self.client_secret = client_secret or os.environ.get("RTZR_CLIENT_SECRET") + + if not (self.client_id and self.client_secret): + raise ValueError("RTZR_CLIENT_ID and RTZR_CLIENT_SECRET must be set") + + self._http_session = http_session + self._owns_session = http_session is None # Track if we own the session + self._token: dict[str, Any] | None = None + self._api_base = "https://openapi.vito.ai" + self._ws_base = "wss://" + self._api_base.split("://", 1)[1] + + async def __aenter__(self): + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.close() + + async def get_token(self) -> str: + """Get a valid access token, refreshing if necessary.""" + if self._token is None or self._token["expire_at"] < time.time() - 3600: + await self._refresh_token() + return self._token["access_token"] + + async def _refresh_token(self) -> None: + """Refresh the access token.""" + sess = self._ensure_http_session() + url = f"{self._api_base}/v1/authenticate" + + try: + async with sess.post( + url, data={"client_id": self.client_id, "client_secret": self.client_secret} + ) as resp: + resp.raise_for_status() + data = await resp.json() + self._token = data + logger.debug("Successfully refreshed RTZR access token") + except aiohttp.ClientResponseError as e: + logger.error("RTZR authentication failed: %s %s", e.status, e.message) + raise RTZRStatusError( + message=f"Authentication failed: {e.message}", + status_code=e.status, + ) from e + except aiohttp.ClientError as e: + logger.error("RTZR authentication connection error: %s", e) + raise RTZRConnectionError("Failed to authenticate with RTZR API") from e + + def _ensure_http_session(self) -> aiohttp.ClientSession: + """Ensure HTTP session is available.""" + if not self._http_session: + self._http_session = aiohttp.ClientSession() + return self._http_session + + async def close(self) -> None: + """Close the HTTP session if we own it.""" + if self._http_session and self._owns_session: + await self._http_session.close() + self._http_session = None + + async def connect_websocket( + self, config: dict[str, str], headers: dict[str, str] | None = None + ) -> aiohttp.ClientWebSocketResponse: + """Connect to the streaming WebSocket endpoint.""" + # Build URL like reference client + query_string = "&".join(f"{k}={v}" for k, v in config.items()) + url = f"{self._ws_base}/v1/transcribe:streaming?{query_string}" + + # Prepare headers + token = await self.get_token() + ws_headers = {"Authorization": f"bearer {token}"} + if headers: + ws_headers.update(headers) + + session = self._ensure_http_session() + + try: + ws = await session.ws_connect(url, headers=ws_headers) + logger.debug("Connected to RTZR WebSocket at %s", url) + return ws + except aiohttp.ClientResponseError as e: + logger.error("RTZR WebSocket connection failed: %s %s", e.status, e.message) + raise RTZRStatusError( + message=f"WebSocket connection failed: {e.message}", + status_code=e.status, + ) from e + except aiohttp.ClientError as e: + logger.error("RTZR WebSocket client error: %s", e) + raise RTZRConnectionError("WebSocket connection failed") from e + + def build_config( + self, + model_name: str = "sommers_ko", + domain: str = "CALL", + sample_rate: int = DEFAULT_SAMPLE_RATE, + encoding: str = "LINEAR16", + epd_time: float = 0.5, + noise_threshold: float = 0.60, + active_threshold: float = 0.80, + use_punctuation: bool = False, + ) -> dict[str, str]: + """Build configuration dictionary for WebSocket connection.""" + config = { + "model_name": model_name, + "domain": domain, + "sample_rate": str(sample_rate), + "encoding": encoding, + "epd_time": str(epd_time), + "noise_threshold": str(noise_threshold), + "active_threshold": str(active_threshold), + "use_punctuation": "true" if use_punctuation else "false", + } + + return config diff --git a/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py new file mode 100644 index 0000000000..2f8cbe7e7b --- /dev/null +++ b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/stt.py @@ -0,0 +1,283 @@ +# Copyright 2023 LiveKit, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass + +import aiohttp + +from livekit import rtc +from livekit.agents import ( + DEFAULT_API_CONNECT_OPTIONS, + APIConnectionError, + APIConnectOptions, + APIStatusError, + APITimeoutError, + stt, + utils, +) +from livekit.agents.types import ( + NOT_GIVEN, + NotGivenOr, +) + +from .log import logger +from .rtzrapi import DEFAULT_SAMPLE_RATE, RTZRConnectionError, RTZROpenAPIClient, RTZRStatusError + +_DEFAULT_CHUNK_MS = 100 + + +@dataclass +class _STTOptions: + model_name: str = "sommers_ko" # sommers_ko: "ko", sommers_ja: "ja" + language: str = "ko" # ko, ja, en + sample_rate: int = DEFAULT_SAMPLE_RATE + encoding: str = "LINEAR16" # or "OGG_OPUS" in future + domain: str = "CALL" # CALL, MEETING + epd_time: float = 0.3 + noise_threshold: float = 0.60 + active_threshold: float = 0.80 + use_punctuation: bool = False + + +class STT(stt.STT): + """RTZR Streaming STT over WebSocket. + + Uses RTZROpenAPIClient for authentication and WebSocket connection. + Audio frames streamed to `/v1/transcribe:streaming` endpoint. + Server performs endpoint detection (EPD), final messages carry `final=true`. + Stream is finalized by sending the string `EOS`. + """ + + def __init__( + self, + *, + model: str = "sommers_ko", + language: str = "ko", + sample_rate: int = 8000, + domain: str = "CALL", + epd_time: float = 0.3, + noise_threshold: float = 0.60, + active_threshold: float = 0.80, + use_punctuation: bool = False, + http_session: aiohttp.ClientSession | None = None, + ) -> None: + super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True)) + + self._params = _STTOptions( + model_name=model, + language=language, + sample_rate=sample_rate, + domain=domain, + epd_time=epd_time, + noise_threshold=noise_threshold, + active_threshold=active_threshold, + use_punctuation=use_punctuation, + ) + self._client = RTZROpenAPIClient(http_session=http_session) + + async def aclose(self) -> None: + """Close the RTZR client and cleanup resources.""" + await self._client.close() + + async def _recognize_impl( + self, + buffer: utils.AudioBuffer, + *, + language: NotGivenOr[str] = NOT_GIVEN, + conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, + ) -> stt.SpeechEvent: + raise NotImplementedError("Single-shot recognition is not supported; use stream().") + + def stream( + self, + *, + language: NotGivenOr[str] = NOT_GIVEN, + conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, + ) -> SpeechStream: + return SpeechStream( + stt=self, + conn_options=conn_options, + ) + + +class SpeechStream(stt.SpeechStream): + def __init__(self, *, stt: STT, conn_options: APIConnectOptions) -> None: + super().__init__(stt=stt, conn_options=conn_options, sample_rate=stt._params.sample_rate) + self._stt = stt + self._ws: aiohttp.ClientWebSocketResponse | None = None + + async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: + config = self._stt._client.build_config( + model_name=self._stt._params.model_name, + domain=self._stt._params.domain, + sample_rate=self._stt._params.sample_rate, + encoding=self._stt._params.encoding, + epd_time=self._stt._params.epd_time, + noise_threshold=self._stt._params.noise_threshold, + active_threshold=self._stt._params.active_threshold, + use_punctuation=self._stt._params.use_punctuation, + ) + + try: + ws = await asyncio.wait_for( + self._stt._client.connect_websocket(config), timeout=self._conn_options.timeout + ) + return ws + except asyncio.TimeoutError as e: + raise APITimeoutError("WebSocket connection timeout") from e + except RTZRStatusError as e: + logger.error("RTZR API status error: %s", e) + raise APIStatusError( + message=e.message, + status_code=e.status_code or 500, + request_id=None, + body=None, + ) from e + except RTZRConnectionError as e: + logger.error("RTZR API connection error: %s", e) + raise APIConnectionError("RTZR API connection failed") from e + + async def _run(self) -> None: + while True: + try: + self._ws = await self._connect_ws() + send_task = asyncio.create_task(self._send_audio_task()) + recv_task = asyncio.create_task(self._recv_task()) + try: + await asyncio.gather(send_task, recv_task) + finally: + await utils.aio.gracefully_cancel(send_task, recv_task) + except asyncio.TimeoutError as e: + logger.error("RTZR STT connection timeout: %s", e) + raise APITimeoutError() from e + except aiohttp.ClientResponseError as e: + logger.error("RTZR STT status error: %s %s", e.status, e.message) + raise APIStatusError( + message=e.message, status_code=e.status, request_id=None, body=None + ) from e + except aiohttp.ClientError as e: + logger.error("RTZR STT client error: %s", e) + raise APIConnectionError() from e + except Exception as e: + logger.exception("RTZR STT unexpected error: %s", e) + raise + finally: + if self._ws: + await self._ws.close() + self._ws = None + break + + @utils.log_exceptions(logger=logger) + async def _send_audio_task(self) -> None: + assert self._ws + # Bundle audio into appropriate chunks using AudioByteStream + audio_bstream = utils.audio.AudioByteStream( + sample_rate=self._stt._params.sample_rate, + num_channels=1, + samples_per_channel=self._stt._params.sample_rate // (1000 // _DEFAULT_CHUNK_MS), + ) + async for data in self._input_ch: + if isinstance(data, rtc.AudioFrame): + # Write audio frame data to the byte stream + frames = audio_bstream.write(data.data.tobytes()) + elif isinstance(data, self._FlushSentinel): + # Flush any remaining audio data + frames = audio_bstream.flush() + else: + frames = [] + + for frame in frames: + await self._ws.send_bytes(frame.data.tobytes()) + + await self._ws.send_str("EOS") + logger.info("Sent EOS to close audio stream") + + @utils.log_exceptions(logger=logger) + async def _recv_task(self) -> None: + assert self._ws + in_speech = False + async for msg in self._ws: + if msg.type == aiohttp.WSMsgType.TEXT: + try: + data = json.loads(msg.data) + except json.JSONDecodeError: + logger.warning("Non-JSON text from RTZR STT: %s", msg.data) + continue + + # Expected schema from reference: {"alternatives":[{"text": "..."}], "final": bool} + if "alternatives" in data and data["alternatives"]: + text = data["alternatives"][0].get("text", "") + is_final = bool(data.get("final", False)) + if text: + # Send START_OF_SPEECH if this is the first transcript in a sequence + if not in_speech: + in_speech = True + self._event_ch.send_nowait( + stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH) + ) + + # Send transcript event + event_type = ( + stt.SpeechEventType.FINAL_TRANSCRIPT + if is_final + else stt.SpeechEventType.INTERIM_TRANSCRIPT + ) + self._event_ch.send_nowait( + stt.SpeechEvent( + type=event_type, + alternatives=[ + stt.SpeechData(text=text, language=self._stt._params.language) + ], + ) + ) + + if is_final: + self._event_ch.send_nowait( + stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH) + ) + in_speech = False + + # Handle error messages + if "error" in data: + error_msg = data["error"] + raise APIStatusError( + message=f"Server error: {error_msg}", + status_code=500, + request_id=None, + body=None, + ) + elif data.get("type") == "error" and "message" in data: + error_msg = data["message"] + raise APIStatusError( + message=f"Server error: {error_msg}", + status_code=500, + request_id=None, + body=None, + ) + + elif msg.type in ( + aiohttp.WSMsgType.CLOSE, + aiohttp.WSMsgType.CLOSING, + aiohttp.WSMsgType.CLOSED, + ): + break + elif msg.type == aiohttp.WSMsgType.ERROR: + logger.error("WebSocket error: %s", self._ws.exception()) + raise APIConnectionError("WebSocket error occurred") + else: + logger.debug("Ignored WebSocket message type: %s", msg.type) diff --git a/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/version.py b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/version.py new file mode 100644 index 0000000000..5cc179e20f --- /dev/null +++ b/livekit-plugins/livekit-plugins-rtzr/livekit/plugins/rtzr/version.py @@ -0,0 +1 @@ +__version__ = "1.2.8" diff --git a/livekit-plugins/livekit-plugins-rtzr/pyproject.toml b/livekit-plugins/livekit-plugins-rtzr/pyproject.toml new file mode 100644 index 0000000000..330af15472 --- /dev/null +++ b/livekit-plugins/livekit-plugins-rtzr/pyproject.toml @@ -0,0 +1,43 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "livekit-plugins-rtzr" +dynamic = ["version"] +description = "Agent Framework plugin for RTZR Streaming STT" +readme = "README.md" +license = "Apache-2.0" +requires-python = ">=3.9.0" +authors = [{ name = "RTZR", email = "support@rtzr.ai" }] +keywords = ["webrtc", "realtime", "audio", "video", "livekit", "rtzr", "speech-to-text"] +classifiers = [ + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Topic :: Multimedia :: Sound/Audio", + "Topic :: Multimedia :: Video", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3 :: Only", +] +dependencies = [ + "livekit-agents>=1.2.8", + "httpx>=0.24.0", + "aiohttp>=3.9.0", +] + +[project.urls] +Documentation = "https://docs.livekit.io" +Website = "https://livekit.io/" +Source = "https://github.com/livekit/agents" + +[tool.hatch.version] +path = "livekit/plugins/rtzr/version.py" + +[tool.hatch.build.targets.wheel] +packages = ["livekit"] + +[tool.hatch.build.targets.sdist] +include = ["/livekit"]