Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,15 @@ async def join(self, call: Call, wait_for_participant=True) -> "AgentSessionCont

# wait for conversation creation coro at the very end of the join flow
self.conversation = await create_conversation_coro
# Provide conversation to the LLM so it can access the chat history.
self.llm.set_conversation(self.conversation)

if wait_for_participant:
self.logger.info("Agent is ready, waiting for participant to join")
await self.wait_for_participant()

# Provide conversation to the LLM so it can access the chat history.
self.llm.set_conversation(self.conversation)
return AgentSessionContextManager(self, self._connection)

async def wait_for_participant(self):
Expand Down
19 changes: 16 additions & 3 deletions agents-core/vision_agents/core/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant
from getstream.video.rtc import AudioStreamTrack, PcmData
from vision_agents.core.processors import Processor
from vision_agents.core.utils.utils import parse_instructions
from vision_agents.core.utils.utils import Instructions, parse_instructions
from vision_agents.core.events.manager import EventManager
from .function_registry import FunctionRegistry
from .llm_types import ToolSchema, NormalizedToolCallItem
Expand All @@ -50,7 +50,6 @@ class LLM(abc.ABC):
before_response_listener: BeforeCb
after_response_listener: AfterCb
agent: Optional["Agent"]
_conversation: Optional["Conversation"]
function_registry: FunctionRegistry

def __init__(self):
Expand All @@ -59,6 +58,9 @@ def __init__(self):
self.events = EventManager()
self.events.register_events_from_module(events)
self.function_registry = FunctionRegistry()
self.instructions: Optional[str] = None
self.parsed_instructions: Optional[Instructions] = None
self._conversation: Optional[Conversation] = None

async def warmup(self) -> None:
"""
Expand Down Expand Up @@ -187,9 +189,20 @@ def _attach_agent(self, agent: Agent):
Attach agent to the llm
"""
self.agent = agent
self._conversation = agent.conversation
self._set_instructions(agent.instructions)

def set_conversation(self, conversation: Conversation):
"""
Provide the Conversation object to the LLM to access the chat history.
To be called by the Agent after it joins the call.

Args:
conversation: a Conversation object

Returns:
"""
self._conversation = conversation

def _set_instructions(self, instructions: str):
self.instructions = instructions

Expand Down
46 changes: 41 additions & 5 deletions agents-core/vision_agents/core/utils/video_utils.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,63 @@
"""Video frame utilities."""

import io

import av
from PIL.Image import Resampling


def ensure_even_dimensions(frame: av.VideoFrame) -> av.VideoFrame:
"""
Ensure frame has even dimensions for H.264 yuv420p encoding.
Crops by 1 pixel if width or height is odd.
"""
needs_width_adjust = frame.width % 2 != 0
needs_height_adjust = frame.height % 2 != 0

if not needs_width_adjust and not needs_height_adjust:
return frame

new_width = frame.width - (1 if needs_width_adjust else 0)
new_height = frame.height - (1 if needs_height_adjust else 0)

cropped = frame.reformat(width=new_width, height=new_height)
cropped.pts = frame.pts
if frame.time_base is not None:
cropped.time_base = frame.time_base

return cropped


def frame_to_jpeg_bytes(
frame: av.VideoFrame, target_width: int, target_height: int, quality: int = 85
) -> bytes:
"""
Convert a video frame to JPEG bytes with resizing.
Args:
frame: an instance of `av.VideoFrame`.
target_width: target width in pixels.
target_height: target height in pixels.
quality: JPEG quality. Default is 85.
Returns: frame as JPEG bytes.
"""
# Convert frame to a PIL image
img = frame.to_image()

# Calculate scaling to maintain aspect ratio
src_width, src_height = img.size
# Calculate scale factor (fit within target dimensions)
scale = min(target_width / src_width, target_height / src_height)
new_width = int(src_width * scale)
new_height = int(src_height * scale)

# Resize with aspect ratio maintained
resized = img.resize((new_width, new_height), Resampling.LANCZOS)

# Save as JPEG with quality control
buf = io.BytesIO()
resized.save(buf, "JPEG", quality=quality, optimize=True)
return buf.getvalue()
5 changes: 2 additions & 3 deletions plugins/anthropic/tests/test_anthropic_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TestClaudeLLM:
async def llm(self) -> ClaudeLLM:
"""Test ClaudeLLM initialization with a provided client."""
llm = ClaudeLLM(model="claude-sonnet-4-20250514")
llm._conversation = InMemoryConversation("be friendly", [])
llm.set_conversation(InMemoryConversation("be friendly", []))
return llm

@pytest.mark.asyncio
Expand Down Expand Up @@ -58,7 +58,7 @@ async def test_native_api(self, llm: ClaudeLLM):
@pytest.mark.integration
async def test_stream(self, llm: ClaudeLLM):
streamingWorks = False

@llm.events.subscribe
async def passed(event: LLMResponseChunkEvent):
nonlocal streamingWorks
Expand All @@ -70,7 +70,6 @@ async def passed(event: LLMResponseChunkEvent):

assert streamingWorks


@pytest.mark.integration
async def test_memory(self, llm: ClaudeLLM):
await llm.simple_response(
Expand Down
2 changes: 1 addition & 1 deletion plugins/aws/tests/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def assert_response_successful(self, response):
async def llm(self) -> BedrockLLM:
"""Test BedrockLLM initialization with a provided client."""
llm = BedrockLLM(model="qwen.qwen3-32b-v1:0", region_name="us-east-1")
llm._conversation = InMemoryConversation("be friendly", [])
llm.set_conversation(InMemoryConversation("be friendly", []))
return llm

@pytest.mark.asyncio
Expand Down
20 changes: 10 additions & 10 deletions plugins/gemini/tests/test_gemini_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
load_dotenv()



class TestGeminiLLM:

def test_message(self):
messages = GeminiLLM._normalize_message("say hi")
assert isinstance(messages[0], Message)
Expand All @@ -32,7 +30,7 @@ def test_advanced_message(self):
@pytest.fixture
async def llm(self) -> GeminiLLM:
llm = GeminiLLM(model="gemini-2.0-flash-exp")
llm._conversation = InMemoryConversation("be friendly", [])
llm.set_conversation(InMemoryConversation("be friendly", []))
return llm

@pytest.mark.integration
Expand All @@ -51,14 +49,14 @@ async def test_native_api(self, llm: GeminiLLM):
@pytest.mark.integration
async def test_stream(self, llm: GeminiLLM):
streamingWorks = False

@llm.events.subscribe
async def passed(event: LLMResponseChunkEvent):
nonlocal streamingWorks
streamingWorks = True

await llm.simple_response("Explain magma to a 5 year old")

# Wait for all events in queue to be processed
await llm.events.wait()

Expand All @@ -67,7 +65,9 @@ async def passed(event: LLMResponseChunkEvent):
@pytest.mark.integration
async def test_memory(self, llm: GeminiLLM):
await llm.simple_response(text="There are 2 dogs in the room")
response = await llm.simple_response(text="How many paws are there in the room?")
response = await llm.simple_response(
text="How many paws are there in the room?"
)

assert "8" in response.text or "eight" in response.text

Expand All @@ -82,7 +82,7 @@ async def test_native_memory(self, llm: GeminiLLM):
@pytest.mark.integration
async def test_instruction_following(self):
llm = GeminiLLM(model="gemini-2.0-flash-exp")
llm._conversation = InMemoryConversation("be friendly", [])
llm.set_conversation(InMemoryConversation("be friendly", []))

llm._set_instructions("only reply in 2 letter country shortcuts")

Expand Down Expand Up @@ -165,11 +165,11 @@ async def handle_error_event(event: events.GeminiErrorEvent):
)
chunk_item_ids.add(chunk_event.item_id)
total_delta_text += chunk_event.delta

# Validate content_index: should be sequential (0, 1, 2, ...) or None
if chunk_event.content_index is not None:
content_indices.append(chunk_event.content_index)

# Verify content_index sequencing if any are provided
if content_indices:
# Should be sequential starting from 0
Expand Down
2 changes: 1 addition & 1 deletion plugins/openai/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pip install getstream-plugins-openai
## Usage

```python
from getstream.plugins.openai import OpenAIRealtime
from vision_agents.plugins.openai import Realtime

# Initialize with API key
sts = OpenAIRealtime(api_key="your_openai_api_key", voice="alloy")
Expand Down
120 changes: 120 additions & 0 deletions plugins/openai/examples/qwen_vl_example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Qwen3-VL hosted on Baseten
Qwen3-VL is the latest open-source Video Language Model (VLM) from Alibaba.
This plugin allows developers to easily run the model hosted on [Baseten](https://www.baseten.co/) with Vision Agents.
The model accepts text and video and responds with text vocalised with the TTS service of your choice.

## Features

- **Video understanding**: Automatically buffers and forwards video frames to Baseten-hosted VLM models
- **Streaming responses**: Supports streaming text responses with real-time chunk events
- **Frame buffering**: Configurable frame rate and buffer duration for optimal performance
- **Event-driven**: Emits LLM events (chunks, completion, errors) for integration with other components

## Installation

```bash
uv add vision-agents[openai]
```

## Quick Start

```python
from vision_agents.core import Agent, User
from vision_agents.plugins import openai, getstream, deepgram, elevenlabs, vogent

async def create_agent(**kwargs) -> Agent:
# Initialize the Baseten VLM
# The api key and base url can be passed via OPENAI_API_KEY and OPENAI_BASE_URL environment variables.
llm = openai.ChatCompletionsVLM(model="qwen3vl")

# Create an agent with video understanding capabilities
agent = Agent(
edge=getstream.Edge(),
agent_user=User(name="Video Assistant", id="agent"),
instructions="You're a helpful video AI assistant. Analyze the video frames and respond to user questions about what you see.",
llm=llm,
stt=deepgram.STT(),
tts=elevenlabs.TTS(),
turn_detection=vogent.TurnDetection(),
processors=[],
)
return agent

async def join_call(agent: Agent, call_type: str, call_id: str, **kwargs) -> None:
await agent.create_user()
call = await agent.create_call(call_type, call_id)

with await agent.join(call):
# The agent will automatically process video frames and respond to user input
await agent.finish()
```

## Configuration

### Environment Variables

- **`OPENAI_API_KEY`**: Your Baseten API key (required)
- **`OPENAI_BASE_URL`**: The base URL for your Baseten API endpoint (required)

### Initialization Parameters

```python
openai.ChatCompletionsVLM(
model: str, # Baseten model name (e.g., "qwen3vl")
api_key: Optional[str] = None, # API key (defaults to OPENAI_API_KEY env var)
base_url: Optional[str] = None, # Base URL (defaults to OPENAI_BASE_URL env var)
fps: int = 1, # Frames per second to process (default: 1)
frame_buffer_seconds: int = 10, # Seconds of video to buffer (default: 10)
client: Optional[AsyncOpenAI] = None, # Custom OpenAI client (optional)
)
```

### Parameters

- **`model`**: The name of the Baseten-hosted model to use. Must be a vision-capable model.
- **`api_key`**: Your Baseten API key. If not provided, reads from `OPENAI_API_KEY` environment variable.
- **`base_url`**: The base URL for Baseten API. If not provided, reads from `OPENAI_BASE_URL` environment variable.
- **`fps`**: Number of video frames per second to capture and send to the model. Lower values reduce API costs but may miss fast-moving content. Default is 1 fps.
- **`frame_buffer_seconds`**: How many seconds of video to buffer. Total buffer size = `fps * frame_buffer_seconds`. Default is 10 seconds.
- **`client`**: Optional pre-configured `AsyncOpenAI` client. If provided, `api_key` and `base_url` are ignored.

## How It Works

1. **Video Frame Buffering**: The plugin automatically subscribes to video tracks when the agent joins a call. It buffers frames at the specified FPS for the configured duration.

2. **Frame Processing**: When responding to user input, the plugin:
- Converts buffered video frames to JPEG format
- Resizes frames to 800x600 (maintaining aspect ratio)
- Encodes frames as base64 data URLs

3. **API Request**: Sends the conversation history (including system instructions) along with all buffered frames to the Baseten model.

4. **Streaming Response**: Processes the streaming response and emits events for each chunk and completion.

## Events

The plugin emits the following events:

- **`LLMResponseChunkEvent`**: Emitted for each text chunk in the streaming response
- **`LLMResponseCompletedEvent`**: Emitted when the response stream completes
- **`LLMErrorEvent`**: Emitted if an API request fails

## Requirements

- Python 3.10+
- `openai>=2.5.0`
- `vision-agents` (core framework)
- Baseten API key and base URL

## Notes

- **Frame Rate**: The default FPS of 1 is optimized for VLM use cases. Higher FPS values will increase API costs and latency.
- **Frame Size**: Frames are automatically resized to 800x600 pixels while maintaining aspect ratio to optimize API payload size.
- **Buffer Duration**: The 10-second default buffer provides context for the model while keeping memory usage reasonable.
- **Tool Calling**: Tool/function calling support is not yet implemented (see TODOs in code).

## Troubleshooting

- **No video processing**: Ensure the agent has joined a call with video tracks available. The plugin automatically subscribes to video when tracks are added.
- **API errors**: Verify your `OPENAI_API_KEY` and `OPENAI_BASE_URL` are set correctly and the model name is valid.
- **High latency**: Consider reducing `fps` or `frame_buffer_seconds` to decrease the number of frames sent per request.
Empty file.
21 changes: 21 additions & 0 deletions plugins/openai/examples/qwen_vl_example/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[project]
name = "qwen3-vl-example"
version = "0.1.0"
description = "Example using Qwen3 VL hosted on Baseten with Vision Agents"
requires-python = ">=3.10"
dependencies = [
"vision-agents",
"vision-agents-plugins-openai",
"vision-agents-plugins-getstream",
"vision-agents-plugins-deepgram",
"vision-agents-plugins-elevenlabs",
"python-dotenv",
]

[tool.uv.sources]
vision-agents = { workspace = true }
vision-agents-plugins-openai = { workspace = true }
vision-agents-plugins-elevenlabs = { workspace = true }
vision-agents-plugins-getstream = { workspace = true }
vision-agents-plugins-deepgram = { workspace = true }

Loading
Loading