Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ async def join(self, call: Call, wait_for_participant=True) -> "AgentSessionCont

# wait for conversation creation coro at the very end of the join flow
self.conversation = await create_conversation_coro
# Provide conversation to the LLM so it can access the chat history.
self.llm.set_conversation(self.conversation)

if wait_for_participant:
self.logger.info("Agent is ready, waiting for participant to join")
Expand Down
19 changes: 16 additions & 3 deletions agents-core/vision_agents/core/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant
from getstream.video.rtc import AudioStreamTrack, PcmData
from vision_agents.core.processors import Processor
from vision_agents.core.utils.utils import parse_instructions
from vision_agents.core.utils.utils import Instructions, parse_instructions
from vision_agents.core.events.manager import EventManager
from .function_registry import FunctionRegistry
from .llm_types import ToolSchema, NormalizedToolCallItem
Expand All @@ -50,7 +50,6 @@ class LLM(abc.ABC):
before_response_listener: BeforeCb
after_response_listener: AfterCb
agent: Optional["Agent"]
_conversation: Optional["Conversation"]
function_registry: FunctionRegistry

def __init__(self):
Expand All @@ -59,6 +58,9 @@ def __init__(self):
self.events = EventManager()
self.events.register_events_from_module(events)
self.function_registry = FunctionRegistry()
self.instructions: Optional[str] = None
self.parsed_instructions: Optional[Instructions] = None
self._conversation: Optional[Conversation] = None

async def warmup(self) -> None:
"""
Expand Down Expand Up @@ -187,9 +189,20 @@ def _attach_agent(self, agent: Agent):
Attach agent to the llm
"""
self.agent = agent
self._conversation = agent.conversation
self._set_instructions(agent.instructions)

def set_conversation(self, conversation: Conversation):
"""
Provide the Conversation object to the LLM to access the chat history.
To be called by the Agent after it joins the call.
Args:
conversation: a Conversation object
Returns:
"""
self._conversation = conversation

def _set_instructions(self, instructions: str):
self.instructions = instructions

Expand Down
46 changes: 41 additions & 5 deletions agents-core/vision_agents/core/utils/video_utils.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,63 @@
"""Video frame utilities."""

import io

import av
from PIL.Image import Resampling


def ensure_even_dimensions(frame: av.VideoFrame) -> av.VideoFrame:
"""
Ensure frame has even dimensions for H.264 yuv420p encoding.
Crops by 1 pixel if width or height is odd.
"""
needs_width_adjust = frame.width % 2 != 0
needs_height_adjust = frame.height % 2 != 0

if not needs_width_adjust and not needs_height_adjust:
return frame

new_width = frame.width - (1 if needs_width_adjust else 0)
new_height = frame.height - (1 if needs_height_adjust else 0)

cropped = frame.reformat(width=new_width, height=new_height)
cropped.pts = frame.pts
if frame.time_base is not None:
cropped.time_base = frame.time_base

return cropped


def frame_to_jpeg_bytes(
frame: av.VideoFrame, target_width: int, target_height: int, quality: int = 85
) -> bytes:
"""
Convert a video frame to JPEG bytes with resizing.
Args:
frame: an instance of `av.VideoFrame`.
target_width: target width in pixels.
target_height: target height in pixels.
quality: JPEG quality. Default is 85.
Returns: frame as JPEG bytes.
"""
# Convert frame to a PIL image
img = frame.to_image()

# Calculate scaling to maintain aspect ratio
src_width, src_height = img.size
# Calculate scale factor (fit within target dimensions)
scale = min(target_width / src_width, target_height / src_height)
new_width = int(src_width * scale)
new_height = int(src_height * scale)

# Resize with aspect ratio maintained
resized = img.resize((new_width, new_height), Resampling.LANCZOS)

# Save as JPEG with quality control
buf = io.BytesIO()
resized.save(buf, "JPEG", quality=quality, optimize=True)
return buf.getvalue()
5 changes: 2 additions & 3 deletions plugins/anthropic/tests/test_anthropic_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TestClaudeLLM:
async def llm(self) -> ClaudeLLM:
"""Test ClaudeLLM initialization with a provided client."""
llm = ClaudeLLM(model="claude-sonnet-4-20250514")
llm._conversation = InMemoryConversation("be friendly", [])
llm.set_conversation(InMemoryConversation("be friendly", []))
return llm

@pytest.mark.asyncio
Expand Down Expand Up @@ -58,7 +58,7 @@ async def test_native_api(self, llm: ClaudeLLM):
@pytest.mark.integration
async def test_stream(self, llm: ClaudeLLM):
streamingWorks = False

@llm.events.subscribe
async def passed(event: LLMResponseChunkEvent):
nonlocal streamingWorks
Expand All @@ -70,7 +70,6 @@ async def passed(event: LLMResponseChunkEvent):

assert streamingWorks


@pytest.mark.integration
async def test_memory(self, llm: ClaudeLLM):
await llm.simple_response(
Expand Down
2 changes: 1 addition & 1 deletion plugins/aws/tests/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def assert_response_successful(self, response):
async def llm(self) -> BedrockLLM:
"""Test BedrockLLM initialization with a provided client."""
llm = BedrockLLM(model="qwen.qwen3-32b-v1:0", region_name="us-east-1")
llm._conversation = InMemoryConversation("be friendly", [])
llm.set_conversation(InMemoryConversation("be friendly", []))
return llm

@pytest.mark.asyncio
Expand Down
20 changes: 10 additions & 10 deletions plugins/gemini/tests/test_gemini_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
load_dotenv()



class TestGeminiLLM:

def test_message(self):
messages = GeminiLLM._normalize_message("say hi")
assert isinstance(messages[0], Message)
Expand All @@ -32,7 +30,7 @@ def test_advanced_message(self):
@pytest.fixture
async def llm(self) -> GeminiLLM:
llm = GeminiLLM(model="gemini-2.0-flash-exp")
llm._conversation = InMemoryConversation("be friendly", [])
llm.set_conversation(InMemoryConversation("be friendly", []))
return llm

@pytest.mark.integration
Expand All @@ -51,14 +49,14 @@ async def test_native_api(self, llm: GeminiLLM):
@pytest.mark.integration
async def test_stream(self, llm: GeminiLLM):
streamingWorks = False

@llm.events.subscribe
async def passed(event: LLMResponseChunkEvent):
nonlocal streamingWorks
streamingWorks = True

await llm.simple_response("Explain magma to a 5 year old")

# Wait for all events in queue to be processed
await llm.events.wait()

Expand All @@ -67,7 +65,9 @@ async def passed(event: LLMResponseChunkEvent):
@pytest.mark.integration
async def test_memory(self, llm: GeminiLLM):
await llm.simple_response(text="There are 2 dogs in the room")
response = await llm.simple_response(text="How many paws are there in the room?")
response = await llm.simple_response(
text="How many paws are there in the room?"
)

assert "8" in response.text or "eight" in response.text

Expand All @@ -82,7 +82,7 @@ async def test_native_memory(self, llm: GeminiLLM):
@pytest.mark.integration
async def test_instruction_following(self):
llm = GeminiLLM(model="gemini-2.0-flash-exp")
llm._conversation = InMemoryConversation("be friendly", [])
llm.set_conversation(InMemoryConversation("be friendly", []))

llm._set_instructions("only reply in 2 letter country shortcuts")

Expand Down Expand Up @@ -165,11 +165,11 @@ async def handle_error_event(event: events.GeminiErrorEvent):
)
chunk_item_ids.add(chunk_event.item_id)
total_delta_text += chunk_event.delta

# Validate content_index: should be sequential (0, 1, 2, ...) or None
if chunk_event.content_index is not None:
content_indices.append(chunk_event.content_index)

# Verify content_index sequencing if any are provided
if content_indices:
# Should be sequential starting from 0
Expand Down
2 changes: 1 addition & 1 deletion plugins/openai/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pip install getstream-plugins-openai
## Usage

```python
from getstream.plugins.openai import OpenAIRealtime
from vision_agents.plugins.openai import Realtime

# Initialize with API key
sts = OpenAIRealtime(api_key="your_openai_api_key", voice="alloy")
Comment on lines +19 to 22
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix import/usage mismatch for Realtime.

Line 19 imports Realtime, but line 22 still references OpenAIRealtime. This will cause a NameError at runtime.

Apply this diff:

-# Initialize with API key
-sts = OpenAIRealtime(api_key="your_openai_api_key", voice="alloy")
+# Initialize with API key  
+sts = Realtime(api_key="your_openai_api_key", voice="alloy")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from vision_agents.plugins.openai import Realtime
# Initialize with API key
sts = OpenAIRealtime(api_key="your_openai_api_key", voice="alloy")
from vision_agents.plugins.openai import Realtime
# Initialize with API key
sts = Realtime(api_key="your_openai_api_key", voice="alloy")
🤖 Prompt for AI Agents
In plugins/openai/README.md around lines 19 to 22, the example imports Realtime
but instantiates OpenAIRealtime causing a NameError; update the instantiation to
use Realtime (e.g., replace OpenAIRealtime(...) with Realtime(...)) or
alternatively change the import to import OpenAIRealtime instead—ensure the
class name used when creating the instance matches the imported identifier.

Expand Down
120 changes: 120 additions & 0 deletions plugins/openai/examples/qwen_vl_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Qwen3-VL hosted on Baseten
Qwen3-VL is the latest open-source Video Language Model (VLM) from Alibaba.
This plugin allows developers to easily run the model hosted on [Baseten](https://www.baseten.co/) with Vision Agents.
The model accepts text and video and responds with text vocalised with the TTS service of your choice.

## Features

- **Video understanding**: Automatically buffers and forwards video frames to Baseten-hosted VLM models
- **Streaming responses**: Supports streaming text responses with real-time chunk events
- **Frame buffering**: Configurable frame rate and buffer duration for optimal performance
- **Event-driven**: Emits LLM events (chunks, completion, errors) for integration with other components

## Installation

```bash
uv add vision-agents[openai]
```

## Quick Start

```python
from vision_agents.core import Agent, User
from vision_agents.plugins import openai, getstream, deepgram, elevenlabs, vogent

async def create_agent(**kwargs) -> Agent:
# Initialize the Baseten VLM
# The api key and base url can be passed via OPENAI_API_KEY and OPENAI_BASE_URL environment variables.
llm = openai.ChatCompletionsVLM(model="qwen3vl")

# Create an agent with video understanding capabilities
agent = Agent(
edge=getstream.Edge(),
agent_user=User(name="Video Assistant", id="agent"),
instructions="You're a helpful video AI assistant. Analyze the video frames and respond to user questions about what you see.",
llm=llm,
stt=deepgram.STT(),
tts=elevenlabs.TTS(),
turn_detection=vogent.TurnDetection(),
processors=[],
)
return agent

async def join_call(agent: Agent, call_type: str, call_id: str, **kwargs) -> None:
await agent.create_user()
call = await agent.create_call(call_type, call_id)

with await agent.join(call):
# The agent will automatically process video frames and respond to user input
await agent.finish()
```

## Configuration

### Environment Variables

- **`OPENAI_API_KEY`**: Your Baseten API key (required)
- **`OPENAI_BASE_URL`**: The base URL for your Baseten API endpoint (required)

### Initialization Parameters

```python
openai.ChatCompletionsVLM(
model: str, # Baseten model name (e.g., "qwen3vl")
api_key: Optional[str] = None, # API key (defaults to OPENAI_API_KEY env var)
base_url: Optional[str] = None, # Base URL (defaults to OPENAI_BASE_URL env var)
fps: int = 1, # Frames per second to process (default: 1)
frame_buffer_seconds: int = 10, # Seconds of video to buffer (default: 10)
client: Optional[AsyncOpenAI] = None, # Custom OpenAI client (optional)
)
```

### Parameters

- **`model`**: The name of the Baseten-hosted model to use. Must be a vision-capable model.
- **`api_key`**: Your Baseten API key. If not provided, reads from `OPENAI_API_KEY` environment variable.
- **`base_url`**: The base URL for Baseten API. If not provided, reads from `OPENAI_BASE_URL` environment variable.
- **`fps`**: Number of video frames per second to capture and send to the model. Lower values reduce API costs but may miss fast-moving content. Default is 1 fps.
- **`frame_buffer_seconds`**: How many seconds of video to buffer. Total buffer size = `fps * frame_buffer_seconds`. Default is 10 seconds.
- **`client`**: Optional pre-configured `AsyncOpenAI` client. If provided, `api_key` and `base_url` are ignored.

## How It Works

1. **Video Frame Buffering**: The plugin automatically subscribes to video tracks when the agent joins a call. It buffers frames at the specified FPS for the configured duration.

2. **Frame Processing**: When responding to user input, the plugin:
- Converts buffered video frames to JPEG format
- Resizes frames to 800x600 (maintaining aspect ratio)
- Encodes frames as base64 data URLs

3. **API Request**: Sends the conversation history (including system instructions) along with all buffered frames to the Baseten model.

4. **Streaming Response**: Processes the streaming response and emits events for each chunk and completion.

## Events

The plugin emits the following events:

- **`LLMResponseChunkEvent`**: Emitted for each text chunk in the streaming response
- **`LLMResponseCompletedEvent`**: Emitted when the response stream completes
- **`LLMErrorEvent`**: Emitted if an API request fails

## Requirements

- Python 3.10+
- `openai>=2.5.0`
- `vision-agents` (core framework)
- Baseten API key and base URL

## Notes

- **Frame Rate**: The default FPS of 1 is optimized for VLM use cases. Higher FPS values will increase API costs and latency.
- **Frame Size**: Frames are automatically resized to 800x600 pixels while maintaining aspect ratio to optimize API payload size.
- **Buffer Duration**: The 10-second default buffer provides context for the model while keeping memory usage reasonable.
- **Tool Calling**: Tool/function calling support is not yet implemented (see TODOs in code).

## Troubleshooting

- **No video processing**: Ensure the agent has joined a call with video tracks available. The plugin automatically subscribes to video when tracks are added.
- **API errors**: Verify your `OPENAI_API_KEY` and `OPENAI_BASE_URL` are set correctly and the model name is valid.
- **High latency**: Consider reducing `fps` or `frame_buffer_seconds` to decrease the number of frames sent per request.
Empty file.
21 changes: 21 additions & 0 deletions plugins/openai/examples/qwen_vl_example/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[project]
name = "qwen3-vl-example"
version = "0.1.0"
description = "Example using Qwen3 VL hosted on Baseten with Vision Agents"
requires-python = ">=3.10"
dependencies = [
"vision-agents",
"vision-agents-plugins-openai",
"vision-agents-plugins-getstream",
"vision-agents-plugins-deepgram",
"vision-agents-plugins-elevenlabs",
"python-dotenv",
]

[tool.uv.sources]
vision-agents = { workspace = true }
vision-agents-plugins-openai = { workspace = true }
vision-agents-plugins-elevenlabs = { workspace = true }
vision-agents-plugins-getstream = { workspace = true }
vision-agents-plugins-deepgram = { workspace = true }

Loading
Loading