Skip to content

Conversation

@mkmeral
Copy link

@mkmeral mkmeral commented Oct 20, 2025

Bidirectional Streaming Event System

Introduction

This document specifies a unified event system for bidirectional streaming in Strands Agents. Bidirectional streaming enables real-time, two-way audio conversations with AI models, supporting use cases like voice assistants, live customer service, and interactive audio applications.

Currently, Strands supports three bidirectional streaming providers:

  • OpenAI Realtime API - WebSocket/WebRTC based with detailed event lifecycle
  • Gemini Live API - Google's SDK-based approach with simplified event handling
  • Nova Sonic - AWS Bedrock's hierarchical event structure

Each provider has different event names, structures, and capabilities. This proposal defines a common event system that works consistently across all providers while preserving provider-specific capabilities through optional fields.

Goals

  1. Unified Interface - Single send() method for all input types instead of separate methods per modality
  2. Consistent Events - Same event names and structures across all providers
  3. Type Safety - Discriminated unions with type field for better IDE support and runtime validation
  4. Extensibility - Easy to add provider-specific features without breaking the core API
  5. Developer Experience - Well-documented parameters and clear examples

Scope

This proposal covers:

  • Core input events (audio, image, tool results)
  • Core output events (session lifecycle, audio/transcript streaming, tool calls, usage tracking)
  • Token usage tracking with modality breakdown
  • Provider-specific feature extensions (deferred to future work)

Out of scope for initial implementation:

  • Text input (deferred to P1)
  • Manual interruption requests (deferred to P1, relying on automatic VAD)
  • Provider-specific extensions (VAD events, conversation management, etc.)

Event Summary

Input Events (3 types)

Sent via await session.send(event):

  1. AudioInputEvent - Send audio data to the model
  2. ImageInputEvent - Send image data to the model (Gemini only)
  3. ToolResultEvent - Send tool execution result back to the model (reuses strands.types._events.ToolResultEvent)

Output Events (10 types)

Received via async for event in session.receive_events():

  1. SessionStartEvent - Session established
  2. TurnStartEvent - Model starts generating response
  3. AudioStreamEvent - Streaming audio output
  4. TranscriptStreamEvent - Audio transcription (user or assistant)
  5. ToolUseStreamEvent - Model requests tool execution (streamed) (reuses strands.types._events.ToolUseStreamEvent)
  6. InterruptionEvent - Generation was interrupted
  7. TurnCompleteEvent - Model finished generating
  8. MultimodalUsage - Token usage with modality breakdown (extends TypedEvent with Usage fields)
  9. SessionEndEvent - Session terminated
  10. ErrorEvent - Error occurred (extends strands.types._events.ForceStopEvent pattern)

Base Classes

All bidirectional events extend strands.types._events.TypedEvent for consistency with the core Strands event system.


Input Events

All input events are sent via: await session.send(event)

AudioInputEvent

Send audio data to the model for processing.

Type Definition:

class AudioInputEvent(TypedEvent):
    def __init__(
        self,
        audio: bytes,
        format: Literal["pcm", "wav", "opus", "mp3"],
        sample_rate: Literal[16000, 24000, 48000],
        channels: Literal[1, 2]
    ):
        super().__init__({
            "type": "bidirectional_",
            "audio": audio,
            "format": format,
            "sample_rate": sample_rate,
            "channels": channels
        })
    
    @property
    def audio(self) -> bytes:
        return cast(bytes, self.get("audio"))
    
    @property
    def format(self) -> str:
        return cast(str, self.get("format"))
    
    @property
    def sample_rate(self) -> int:
        return cast(int, self.get("sample_rate"))
    
    @property
    def channels(self) -> int:
        return cast(int, self.get("channels"))

Parameters:

  • audio: Raw audio data as bytes (not base64 encoded)
  • format: Audio encoding format
    • pcm: Raw PCM (Pulse Code Modulation) - uncompressed audio
    • wav: WAV file format (typically contains PCM)
    • opus: Opus codec - compressed, good for speech
    • mp3: MP3 codec - compressed, widely supported
  • sample_rate: Number of audio samples per second (Hz)
    • 16000: 16kHz - standard for speech recognition
    • 24000: 24kHz - higher quality speech
    • 48000: 48kHz - high quality audio
  • channels: Number of audio channels
    • 1: Mono (single channel) - recommended for speech
    • 2: Stereo (dual channel) - for spatial audio

Example:

event = AudioInputEvent(
    audio=audio_bytes,
    format="pcm",
    sample_rate=16000,
    channels=1
)
await session.send(event)

Provider Implementation:

Provider Implementation Notes
OpenAI input_audio_buffer.append Requires base64 encoding (handled internally)
Gemini send_realtime_input(audio=Blob(...)) Uses SDK's Blob type
Nova Sonic audioInput event Requires contentStart before first chunk

ImageInputEvent

Send image data to the model.

Type Definition:

class ImageInputEvent(TypedEvent):
    def __init__(
        self,
        image: Union[bytes, str],
        mime_type: str,
        encoding: Literal["base64", "raw"]
    ):
        super().__init__({
            "type": "bidirectional_image_input",
            "image": image,
            "mime_type": mime_type,
            "encoding": encoding
        })
    
    @property
    def image(self) -> Union[bytes, str]:
        return cast(Union[bytes, str], self.get("image"))
    
    @property
    def mime_type(self) -> str:
        return cast(str, self.get("mime_type"))
    
    @property
    def encoding(self) -> str:
        return cast(str, self.get("encoding"))

Parameters:

  • image: Image data, either raw bytes or base64-encoded string
  • mime_type: MIME type of the image
    • image/jpeg: JPEG format
    • image/png: PNG format
    • image/gif: GIF format
    • image/webp: WebP format
  • encoding: How the image data is encoded
    • raw: Raw bytes (binary data)
    • base64: Base64-encoded string

Example:

event = ImageInputEvent(
    image=image_bytes,
    mime_type="image/jpeg",
    encoding="raw"
)
await session.send(event)

Provider Implementation:

Provider Implementation Notes
OpenAI Not supported Raises NotImplementedError
Gemini send() with inline_data Accepts both base64 and raw bytes
Nova Sonic Not supported Raises NotImplementedError

ToolResultEvent

Reuses: strands.types._events.ToolResultEvent

Send tool execution result back to the model.

Type Definition:

from strands.types._events import ToolResultEvent
from strands.types.tools import ToolResult

# ToolResult structure:
# {
#     "toolUseId": str,
#     "content": List[ContentBlock],
#     "status": Optional[Literal["success", "error"]]
# }

Parameters:

  • tool_result: ToolResult object containing:
    • toolUseId: Unique identifier matching the tool use request from the model
    • content: List of content blocks with the tool execution result
    • status: Optional status indicator ("success" or "error")

Example:

from strands.types._events import ToolResultEvent
from strands.types.tools import ToolResult

tool_result: ToolResult = {
    "toolUseId": "toolu_abc123",
    "content": [{"text": '{"temperature": 72, "conditions": "sunny"}'}],
    "status": "success"
}
event = ToolResultEvent(tool_result)
await session.send(event)

Provider Implementation:

Provider Implementation Notes
OpenAI conversation.item.create with function_call_output Uses call_id field name
Gemini send_tool_response(function_responses=[...]) Uses FunctionResponse SDK type
Nova Sonic toolResult event Requires contentStart wrapper

Note: This reuses the existing Strands ToolResultEvent for consistency with the core agent system.


Output Events

All output events are received via: async for event in session.receive_events()

SessionStartEvent

Session established and ready for interaction.

Type Definition:

class SessionStartEvent(TypedEvent):
    def __init__(self, session_id: str, model: str, capabilities: List[str]):
        super().__init__({
            "type": "bidirectional_session_start",
            "session_id": session_id,
            "model": model,
            "capabilities": capabilities
        })
    
    @property
    def session_id(self) -> str:
        return cast(str, self.get("session_id"))
    
    @property
    def model(self) -> str:
        return cast(str, self.get("model"))
    
    @property
    def capabilities(self) -> List[str]:
        return cast(List[str], self.get("capabilities"))

Parameters:

  • session_id: Unique identifier for this session
  • model: Model identifier (e.g., "gpt-realtime", "gemini-2.0-flash-live")
  • capabilities: List of supported features (e.g., ["audio", "tools", "images"])

Example:

# Received from session
async for event in session.receive_events():
    if isinstance(event, SessionStartEvent):
        print(f"Session {event.session_id} started with {event.model}")

Provider Implementation:

Provider Source Event Notes
OpenAI session.created Provides full session config
Gemini Synthesized on connection No explicit session event
Nova Sonic sessionStart Includes inference configuration

TurnStartEvent

Model starts generating a response.

Type Definition:

class TurnStartEvent(TypedEvent):
    def __init__(self, turn_id: str):
        super().__init__({
            "type": "bidirectional_turn_start",
            "turn_id": turn_id
        })
    
    @property
    def turn_id(self) -> str:
        return cast(str, self.get("turn_id"))

Parameters:

  • turn_id: Unique identifier for this turn (used in turn.complete)

Example:

async for event in session.receive_events():
    if isinstance(event, TurnStartEvent):
        print(f"Turn {event.turn_id} started")

Provider Implementation:

Provider Source Event Notes
OpenAI response.created Explicit response lifecycle
Gemini Detected from first content No explicit turn start event
Nova Sonic completionStart Uses completionId as turnId

AudioStreamEvent

Streaming audio output from the model.

Type Definition:

class AudioStreamEvent(TypedEvent):
    def __init__(
        self,
        audio: bytes,
        format: Literal["pcm", "wav", "opus", "mp3"],
        sample_rate: Literal[16000, 24000, 48000],
        channels: Literal[1, 2]
    ):
        super().__init__({
            "type": "bidirectional_audio_stream",
            "audio": audio,
            "format": format,
            "sample_rate": sample_rate,
            "channels": channels
        })
    
    @property
    def audio(self) -> bytes:
        return cast(bytes, self.get("audio"))
    
    @property
    def format(self) -> str:
        return cast(str, self.get("format"))
    
    @property
    def sample_rate(self) -> int:
        return cast(int, self.get("sample_rate"))
    
    @property
    def channels(self) -> int:
        return cast(int, self.get("channels"))

Parameters:

  • audio: Raw audio data as bytes (not base64 encoded)
  • format: Audio encoding format (see AudioInputEvent for details)
  • sample_rate: Number of audio samples per second in Hz
  • channels: Number of audio channels (1=mono, 2=stereo)

Example:

async for event in session.receive_events():
    if isinstance(event, AudioStreamEvent):
        play_audio(event.audio)

Provider Implementation:

Provider Source Event Notes
OpenAI response.audio.delta Base64-encoded, we decode
Gemini server_content.model_turn Already raw bytes
Nova Sonic audioOutput Base64-encoded, we decode

TranscriptStreamEvent

Audio transcription of speech (user or assistant).

Type Definition:

class TranscriptStreamEvent(TypedEvent):
    def __init__(
        self,
        text: str,
        source: Literal["user", "assistant"],
        is_final: bool
    ):
        super().__init__({
            "type": "bidirectional_transcript_stream",
            "text": text,
            "source": source,
            "is_final": is_final
        })
    
    @property
    def text(self) -> str:
        return cast(str, self.get("text"))
    
    @property
    def source(self) -> str:
        return cast(str, self.get("source"))
    
    @property
    def is_final(self) -> bool:
        return cast(bool, self.get("is_final"))

Parameters:

  • text: Transcribed text from audio
  • source: Who is speaking
    • user: Transcription of user's speech input
    • assistant: Transcription of model's audio output
  • is_final: Whether this is the final/complete transcript
    • true: Complete, final transcription
    • false: Partial/incremental transcription (more may follow)

Example:

async for event in session.receive_events():
    if isinstance(event, TranscriptStreamEvent):
        print(f"{event.source}: {event.text} (final={event.is_final})")

Provider Implementation:

Provider Source Event Notes
OpenAI response.audio_transcript.delta (assistant)
conversation.item.input_audio_transcription.delta (user)
Separate events for input/output
Gemini server_content.turn_complete (user)
server_content.model_turn (assistant)
Provided in server_content
Nova Sonic textOutput events Only emits transcripts, not separate text responses

Important: Nova Sonic does not return separate text responses. The textOutput events are transcripts of the audio conversation. For consistency, all providers use transcript.chunk for text representations of speech.


ToolUseStreamEvent

Reuses: strands.types._events.ToolUseStreamEvent

Model requests tool execution, streamed incrementally.

Type Definition:

from strands.types._events import ToolUseStreamEvent
from strands.types.streaming import ContentBlockDelta

# Already defined in strands.types._events:
# class ToolUseStreamEvent(ModelStreamEvent):
#     def __init__(self, delta: ContentBlockDelta, current_tool_use: dict[str, Any]):
#         super().__init__({"delta": delta, "current_tool_use": current_tool_use})

# ContentBlockDelta structure:
# {
#     "toolUse": {
#         "input": str  # JSON string fragment
#     }
# }

# current_tool_use accumulates the complete state:
# {
#     "toolUseId": str,
#     "name": str,
#     "input": str  # Accumulated JSON string (parse when complete)
# }

Parameters:

  • delta: ContentBlockDelta containing incremental tool use input
    • toolUse.input: JSON string fragment being streamed
  • current_tool_use: Accumulated tool use state
    • toolUseId: Unique identifier for this tool use
    • name: Name of the tool to execute
    • input: Accumulated JSON string (parse when complete)

Example:

from strands.types._events import ToolUseStreamEvent
import json

async for event in session.receive_events():
    if isinstance(event, ToolUseStreamEvent):
        # Access incremental delta
        delta = event.get("delta", {})
        tool_delta = delta.get("toolUse", {})
        input_fragment = tool_delta.get("input", "")
        
        # Access accumulated state
        current = event.get("current_tool_use", {})
        tool_use_id = current.get("toolUseId")
        name = current.get("name")
        accumulated_input = current.get("input", "")
        
        # Check if complete (implementation-specific)
        # When complete, parse and execute:
        if is_complete(accumulated_input):
            tool_input = json.loads(accumulated_input)
            result = execute_tool(name, tool_input)
            
            # Send result
            tool_result: ToolResult = {
                "toolUseId": tool_use_id,
                "content": [{"text": json.dumps(result)}],
                "status": "success"
            }
            await session.send(ToolResultEvent(tool_result))

Provider Implementation:

Provider Source Event Streaming Behavior
OpenAI response.function_call_arguments.delta Streams - Multiple deltas with input fragments
Gemini message.tool_call.function_calls Single delta - One event with complete input
Nova Sonic toolUse Single delta - One event with complete input

Note: This reuses the existing Strands ToolUseStreamEvent directly. For providers that stream (OpenAI), multiple events are emitted with incremental deltas. For providers that don't stream (Gemini, Nova Sonic), a single event is emitted with the complete tool use as the delta. This provides a unified streaming interface while accommodating different provider behaviors.


InterruptionEvent

Model generation was interrupted.

Type Definition:

class InterruptionEvent(TypedEvent):
    def __init__(
        self,
        reason: Literal["user_speech", "error"],
        turn_id: Optional[str] = None
    ):
        super().__init__({
            "type": "bidirectional_interruption",
            "reason": reason,
            "turn_id": turn_id
        })
    
    @property
    def reason(self) -> str:
        return cast(str, self.get("reason"))
    
    @property
    def turn_id(self) -> Optional[str]:
        return cast(Optional[str], self.get("turn_id"))

Parameters:

  • reason: Why the interruption occurred
    • user_speech: User started speaking (detected by VAD)
    • error: Interruption due to an error condition
  • turn_id: ID of the turn that was interrupted (may be None)

Example:

async for event in session.receive_events():
    if isinstance(event, InterruptionEvent):
        print(f"Interrupted: {event.reason}")
        clear_audio_buffer()

Provider Implementation:

Provider Source Event Notes
OpenAI input_audio_buffer.speech_started Detected via VAD
Gemini server_content.interrupted Explicit interrupted flag
Nova Sonic stopReason: "INTERRUPTED" in contentEnd Uses stopReason field

Note: Manual interruption requests are deferred to P1. For now, interruptions are detected automatically by provider VAD systems.


TurnCompleteEvent

Model finished generating response.

Type Definition:

class TurnCompleteEvent(TypedEvent):
    def __init__(
        self,
        turn_id: str,
        stop_reason: Literal["complete", "interrupted", "tool_use", "error"]
    ):
        super().__init__({
            "type": "bidirectional_turn_complete",
            "turn_id": turn_id,
            "stop_reason": stop_reason
        })
    
    @property
    def turn_id(self) -> str:
        return cast(str, self.get("turn_id"))
    
    @property
    def stop_reason(self) -> str:
        return cast(str, self.get("stop_reason"))

Parameters:

  • turn_id: ID of the turn that completed (matches turn.start)
  • stop_reason: Why the turn ended
    • complete: Model finished generating naturally
    • interrupted: Turn was interrupted
    • tool_use: Model is requesting tool execution
    • error: Turn ended due to an error

Example:

async for event in session.receive_events():
    if isinstance(event, TurnCompleteEvent):
        print(f"Turn {event.turn_id} completed: {event.stop_reason}")

Provider Implementation:

Provider Source Event Notes
OpenAI response.done Detailed status mapping
Gemini server_content.turn_complete or generation_complete Two possible events
Nova Sonic completionEnd Uses stopReason field

MultimodalUsage

Extends: strands.types._events.TypedEvent and strands.types.event_loop.Usage

Token usage event with modality breakdown for multimodal streaming.

Type Definition:

from strands.types._events import TypedEvent
from strands.types.event_loop import Usage
from typing import TypedDict, Literal, List, cast
from typing_extensions import Required

class ModalityUsage(TypedDict):
    """Token usage for a specific modality"""
    modality: Literal["text", "audio", "image", "cached"]
    input_tokens: int
    output_tokens: int

class MultimodalUsage(TypedEvent):
    """Event emitted when usage information is updated during streaming.
    
    Combines TypedEvent behavior with Usage fields for a unified event type.
    """
    
    def __init__(
        self,
        input_tokens: int,
        output_tokens: int,
        total_tokens: int,
        modality_details: List[ModalityUsage] | None = None,
        cache_read_input_tokens: int | None = None,
        cache_write_input_tokens: int | None = None
    ):
        data = {
            "type": "multimodal_usage",
            "inputTokens": input_tokens,
            "outputTokens": output_tokens,
            "totalTokens": total_tokens,
        }
        if modality_details is not None:
            data["modality_details"] = modality_details
        if cache_read_input_tokens is not None:
            data["cacheReadInputTokens"] = cache_read_input_tokens
        if cache_write_input_tokens is not None:
            data["cacheWriteInputTokens"] = cache_write_input_tokens
        super().__init__(data)
    
    @property
    def input_tokens(self) -> int:
        return cast(int, self.get("inputTokens"))
    
    @property
    def output_tokens(self) -> int:
        return cast(int, self.get("outputTokens"))
    
    @property
    def total_tokens(self) -> int:
        return cast(int, self.get("totalTokens"))
    
    @property
    def modality_details(self) -> List[ModalityUsage]:
        return cast(List[ModalityUsage], self.get("modality_details", []))
    
    @property
    def cache_read_input_tokens(self) -> int | None:
        return cast(int | None, self.get("cacheReadInputTokens"))
    
    @property
    def cache_write_input_tokens(self) -> int | None:
        return cast(int | None, self.get("cacheWriteInputTokens"))

Parameters:

  • input_tokens (required): Total tokens used for all input modalities
  • output_tokens (required): Total tokens used for all output modalities
  • total_tokens (required): Sum of input and output tokens
  • modality_details (optional): List of token usage per modality
    • modality: Type of content (text, audio, image, cached)
    • input_tokens: Tokens used for this modality's input
    • output_tokens: Tokens used for this modality's output
  • cache_read_input_tokens (optional): Tokens read from cache
  • cache_write_input_tokens (optional): Tokens written to cache

Example:

async for event in session.receive_events():
    if isinstance(event, MultimodalUsage):
        # Access standard Usage fields
        print(f"Total: {event.total_tokens} tokens")
        print(f"  Input: {event.input_tokens}")
        print(f"  Output: {event.output_tokens}")
        
        # Access modality breakdown
        if event.modality_details:
            print("Modality breakdown:")
            for detail in event.modality_details:
                print(f"  {detail['modality']}: {detail['input_tokens']} in, {detail['output_tokens']} out")

Provider Implementation:

Provider Source Event Modality Support
OpenAI rate_limits.updated or response usage Full breakdown: text, audio, cached
Gemini usage_metadata in response May only provide totals
Nova Sonic usageEvent Separates speech vs text tokens

Notes:

  • Direct event type (not wrapped) - simpler to use and emit
  • Includes type: "multimodal_usage" for event discrimination
  • Compatible with existing Strands Usage structure (same field names)
  • Not all providers give modality breakdown - modality_details may be empty
  • Cached tokens (OpenAI) represent tokens that were not re-processed
  • Emitted periodically during conversation

SessionEndEvent

Session terminated.

Type Definition:

class SessionEndEvent(TypedEvent):
    def __init__(self, reason: Literal["client_disconnect", "timeout", "error", "complete"]):
        super().__init__({
            "type": "bidirectional_session_end",
            "reason": reason
        })
    
    @property
    def reason(self) -> str:
        return cast(str, self.get("reason"))

Parameters:

  • reason: Why the session ended
    • client_disconnect: Client closed the connection
    • timeout: Session timed out
    • error: Session ended due to error
    • complete: Session completed normally

Example:

async for event in session.receive_events():
    if isinstance(event, SessionEndEvent):
        print(f"Session ended: {event.reason}")
        break

Provider Implementation:

Provider Source Event Notes
OpenAI Synthesized on close No explicit event
Gemini Synthesized on context exit No explicit event
Nova Sonic sessionEnd Explicit event

ErrorEvent

Extends: strands.types._events.ForceStopEvent pattern

Error occurred during the session.

Type Definition:

from strands.types._events import TypedEvent
from typing import Optional, Dict, Any

class ErrorEvent(TypedEvent):
    """Event emitted when an error occurs during bidirectional streaming"""
    
    def __init__(
        self,
        error: Exception,
        code: Optional[str] = None,
        details: Optional[Dict[str, Any]] = None
    ):
        super().__init__({
            "bidirectional_error": True,
            "error": error,
            "error_message": str(error),
            "error_code": code or type(error).__name__,
            "error_details": details
        })
    
    @property
    def error(self) -> Exception:
        return cast(Exception, self.get("error"))
    
    @property
    def code(self) -> str:
        return cast(str, self.get("error_code"))
    
    @property
    def message(self) -> str:
        return cast(str, self.get("error_message"))
    
    @property
    def details(self) -> Optional[Dict[str, Any]]:
        return cast(Optional[Dict[str, Any]], self.get("error_details"))

Parameters:

  • error: The exception that occurred
  • code: Optional error code for programmatic handling (defaults to exception class name)
  • details: Optional additional error information

Example:

async for event in session.receive_events():
    if isinstance(event, ErrorEvent):
        print(f"Error {event.code}: {event.message}")
        if event.details:
            print(f"Details: {event.details}")

Provider Implementation:

Provider Source Event Notes
OpenAI error event Structured error events
Gemini Synthesized from exceptions Catch and convert
Nova Sonic Synthesized from error responses Convert to standard format

Note: Follows the pattern of ForceStopEvent which accepts exceptions, maintaining consistency with core Strands error handling.


Deferred Features

The following features exist in some providers but are deferred to future work:

Text Input (P1)

Direct text input without audio. Deferred to focus on audio-first interactions.

Manual Interruption (P1)

Client-initiated interruption via InterruptRequest event. For now, relying on automatic VAD-based interruption.

Voice Activity Detection Events (P2)

Explicit VAD events (voice.activity) for speech start/stop detection. Available in OpenAI and Gemini.

Conversation Management (P2)

Conversation item management (conversation.item.* events). Available in OpenAI only.

Rate Limiting (P2)

Rate limit information (rate.limit events). Available in OpenAI only.

Thinking Mode (P2)

Thinking mode events (thinking.*). Available in Gemini only.

MCP Support (P2)

Model Context Protocol events (mcp.*). Available in OpenAI only.

Content Lifecycle (P2)

Hierarchical content structure events (content.block.*). Available in Nova Sonic only.


Implementation Notes

Type Safety

All events extend strands.types._events.TypedEvent base class and use discriminated unions:

from strands.types._events import TypedEvent, ToolResultEvent, ToolUseStreamEvent

# Input events
InputEvent = Union[AudioInputEvent, ImageInputEvent, ToolResultEvent]

# Output events
OutputEvent = Union[
    SessionStartEvent, TurnStartEvent, AudioStreamEvent, TranscriptStreamEvent,
    ToolUseStreamEvent, InterruptionEvent, TurnCompleteEvent, MultimodalUsage,
    SessionEndEvent, ErrorEvent
]

This enables type narrowing with isinstance() checks and provides clean property-based access.

Reused Components

The following components are reused from core Strands:

  • TypedEvent (strands.types._events.TypedEvent) - Base class for all events
  • ToolResultEvent (strands.types._events.ToolResultEvent) - Tool result handling
  • Usage fields (strands.types.event_loop.Usage) - Field names reused in MultimodalUsage event
  • ToolResult (strands.types.tools.ToolResult) - Tool result structure
  • Error handling pattern - Following ForceStopEvent approach

New Bidirectional-Specific Events

The following events are unique to bidirectional streaming:

  • AudioInputEvent / AudioStreamEvent - Audio I/O
  • ImageInputEvent - Image input
  • SessionStartEvent / SessionEndEvent - Session lifecycle
  • TurnStartEvent / TurnCompleteEvent - Turn-based interaction
  • TranscriptStreamEvent - Audio transcription
  • InterruptionEvent - Interruption handling
  • MultimodalUsage - Usage event with modality breakdown
  • ErrorEvent - Bidirectional-specific error handling

Constants

SUPPORTED_AUDIO_FORMATS = ["pcm", "wav", "opus", "mp3"]
SUPPORTED_SAMPLE_RATES = [16000, 24000, 48000]
SUPPORTED_CHANNELS = [1, 2]
DEFAULT_SAMPLE_RATE = 16000
DEFAULT_CHANNELS = 1
DEFAULT_FORMAT = "pcm"

mehtarac and others added 18 commits October 6, 2025 06:11
feat(bidirectional_streaming): Add experimental bidirectional streaming MVP POC implementation
Sync fork with main branch of sdk-python
- Add input_audio_transcription and output_audio_transcription parameter pass-through in _build_live_config()
- These parameters enable real-time transcription of both user speech (input) and model audio responses (output)
- Remove debug logging and temporary debug files (gemini_live_events.jsonl, debug_transcripts.py)
- Clean up unused json import

The transcription parameters were being set in the test configuration but weren't being passed through to the SDK because _build_live_config() only handled specific parameters. Now transcription events will be properly emitted via the transcript event type.
Instead of cherry-picking specific parameters, just pass through all config from params directly to the SDK. This is simpler and more flexible - users can configure any Gemini Live API parameter without us having to explicitly handle each one.

The previous approach was unnecessarily complicated with manual parameter filtering.
- Add proper error logging in close() method
- Remove empty line in send_tool_result() try block
- Add newline at end of file
- Improve code consistency
- Add GeminiLiveBidirectionalModel and GeminiLiveSession to models __init__.py
- Add ImageInputEvent and TranscriptEvent to types __init__.py
- Ensures new types and model are properly exported for external use
…Gemini Live)

- OpenAI Realtime model provider with function calling fixes
- Gemini Live model provider with video support
- Combined event types: UsageMetricsEvent, VoiceActivityEvent, TranscriptEvent, ImageInputEvent
- All three providers now available: NovaSonic, OpenAI Realtime, Gemini Live
… OpenAI)

- Supports switching between providers via --provider flag
- Includes video/camera support for Gemini Live
- Command-line arguments for duration and camera control
- Unified event handling for all provider types

Usage:
  python test_bidirectional_streaming.py --provider gemini
  python test_bidirectional_streaming.py --provider nova
  python test_bidirectional_streaming.py --provider openai --no-camera
- Created EventLogger utility for structured event logging
- Logs both incoming (from provider) and outgoing (to provider) events
- Truncates long strings (base64 audio/images) to 100 chars for readability
- Saves events to JSONL files in event_logs/ directory
- Added logging to all three providers:
  - Gemini Live: raw events + audio/text/image inputs
  - Nova Sonic: raw events + audio/text inputs
  - OpenAI Realtime: raw events + audio/text inputs

Event logs include:
- Timestamp and sequence number
- Provider name and direction (incoming/outgoing)
- Event type and truncated data
- Useful for comparing event structures across providers
Explains how to use event logs to compare provider event structures
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants