22import base64
33import json
44import logging
5+ import uuid
56from typing import Optional , List , Dict , Any
67from getstream .video .rtc .audio_track import AudioStreamTrack
78from getstream .video .rtc .track_util import PcmData
2324DEFAULT_SAMPLE_RATE = 16000
2425
2526
27+ """
28+ TODO:
29+ - audio sending 3-4 functions
30+ - connect method
31+ - process response loop
32+ """
33+
34+
2635class Realtime (realtime .Realtime ):
2736 """
2837 Realtime on AWS Bedrock with support for audio/video streaming.
@@ -65,37 +74,26 @@ class Realtime(realtime.Realtime):
6574 await llm.close()
6675 """
6776 connected : bool = False
68-
69- async def _close_impl (self ):
70- pass
77+ voice_id : str
7178
7279 def __init__ (
73- self ,
74- model : str = DEFAULT_MODEL ,
75- region_name : str = "us-east-1" ,
76- aws_access_key_id : Optional [str ] = None ,
77- aws_secret_access_key : Optional [str ] = None ,
78- aws_session_token : Optional [str ] = None ,
79- sample_rate : int = 16000 ,
80- ** kwargs
80+ self ,
81+ model : str = DEFAULT_MODEL ,
82+ region_name : str = "us-east-1" ,
83+ aws_access_key_id : Optional [str ] = None ,
84+ aws_secret_access_key : Optional [str ] = None ,
85+ aws_session_token : Optional [str ] = None ,
86+ sample_rate : int = 16000 ,
87+ ** kwargs
8188 ) -> None :
8289 """
83- Initialize Bedrock Realtime with Nova Sonic.
84-
85- Args:
86- model: The Bedrock model ID (default: us.amazon.nova-sonic-v1:0)
87- region_name: AWS region name (default: us-east-1)
88- aws_access_key_id: Optional AWS access key ID
89- aws_secret_access_key: Optional AWS secret access key
90- aws_session_token: Optional AWS session token
91- sample_rate: Audio sample rate in Hz (default: 16000)
92- **kwargs: Additional arguments passed to parent class
90+
9391 """
9492 super ().__init__ (** kwargs )
9593 self .model = model
9694 self .region_name = region_name
9795 self .sample_rate = sample_rate
98-
96+
9997 # Initialize Bedrock Runtime client with SDK
10098 config = Config (
10199 endpoint_uri = f"https://bedrock-runtime.{ region_name } .amazonaws.com" ,
@@ -104,23 +102,145 @@ def __init__(
104102 )
105103 self .client = BedrockRuntimeClient (config = config )
106104 self .logger = logging .getLogger (__name__ )
107-
105+
108106 # Audio output track - Bedrock typically outputs at 16kHz
109107 self .output_track = AudioStreamTrack (
110108 framerate = sample_rate , stereo = False , format = "s16"
111109 )
112-
110+
113111 self ._video_forwarder : Optional [VideoForwarder ] = None
114112 self ._stream_task : Optional [asyncio .Task [Any ]] = None
115113 self ._is_connected = False
116114 self ._message_queue : asyncio .Queue [Dict [str , Any ]] = asyncio .Queue ()
117115 self ._conversation_messages : List [Dict [str , Any ]] = []
118116 self ._pending_tool_uses : Dict [int , Dict [str , Any ]] = {} # Track tool calls across stream events
119-
117+
120118 # Audio streaming configuration
121119 self .prompt_name = "default_prompt"
122120 self .audio_content_name = "audio_input"
123121
122+ async def content_input (self , content : str , role : str ):
123+ """
124+ For text input Nova expects content start, text input and then content end
125+ This method wraps the 3 events in one operation
126+ """
127+ content_name = str (uuid .uuid4 ())
128+ await self .content_start (content_name , role )
129+ await self .text_input (content_name , content )
130+ await self .content_end (content_name )
131+
132+ async def start_session (self ):
133+ # subclass this to change the session start
134+ event = {
135+ "event" : {
136+ "sessionStart" : {
137+ "inferenceConfiguration" : {
138+ "maxTokens" : 1024 ,
139+ "topP" : 0.9 ,
140+ "temperature" : 0.7
141+ }
142+ }
143+ }
144+ }
145+
146+ await self .send_event (event )
147+
148+ async def start_prompt (self ):
149+ prompt_name = self .session_id
150+ event = {
151+ "event" : {
152+ "promptStart" : {
153+ "promptName" : prompt_name ,
154+ "textOutputConfiguration" : {
155+ "mediaType" : "text/plain"
156+ },
157+ "audioOutputConfiguration" : {
158+ "mediaType" : "audio/lpcm" ,
159+ "sampleRateHertz" : 24000 ,
160+ "sampleSizeBits" : 16 ,
161+ "channelCount" : 1 ,
162+ "voiceId" : self .voice_id ,
163+ "encoding" : "base64" ,
164+ "audioType" : "SPEECH"
165+ }
166+ }
167+ }
168+ }
169+ await self .send_event (event )
170+
171+
172+
173+ async def content_start (self , content_name : str , role : str ):
174+ event = {
175+ "event" : {
176+ "contentStart" : {
177+ "promptName" : self .session_id ,
178+ "contentName" : content_name ,
179+ "type" : "TEXT" ,
180+ "interactive" : False ,
181+ "role" : role ,
182+ "textInputConfiguration" : {
183+ "mediaType" : "text/plain"
184+ }
185+ }
186+ }
187+ }
188+ await self .send_event (event )
189+
190+ async def text_input (self , content_name : str , content : str ):
191+ event = {
192+ "event" : {
193+ "textInput" : {
194+ "promptName" : self .session_id ,
195+ "contentName" : content_name ,
196+ "content" : content ,
197+ }
198+ }
199+ }
200+ await self .send_event (event )
201+
202+ async def content_end (self , content_name : str ):
203+ event = {
204+ "event" : {
205+ "contentEnd" : {
206+ "promptName" : self .session_id ,
207+ "contentName" : content_name ,
208+ }
209+ }
210+ }
211+ await self .send_event (event )
212+
213+ async def send_event (self , event : Dict [str , Any ]) -> None :
214+ event_json = json .dumps (event )
215+ event = InvokeModelWithBidirectionalStreamInputChunk (
216+ value = BidirectionalInputPayloadPart (bytes_ = event_json .encode ('utf-8' ))
217+ )
218+ await self .stream .input_stream .send (event )
219+
220+ async def _close_impl (self ):
221+ if not self .connected :
222+ return
223+
224+ prompt_end = {
225+ "event" : {
226+ "promptEnd" : {
227+ "promptName" : self .session_id ,
228+ }
229+ }
230+ }
231+ await self .send_event (prompt_end )
232+
233+ session_end = {
234+ "event" : {
235+ "sessionEnd" : {}
236+ }
237+ }
238+ await self .send_event (session_end )
239+
240+ await self .stream .input_stream .close ()
241+
242+
243+
124244 async def connect (self ):
125245 """To connect we need to do a few things
126246
@@ -156,12 +276,7 @@ async def connect(self):
156276 # audio input is always on
157277 await self .start_audio_input ()
158278
159- async def send_event (self , event_json ):
160- """Send an event to the stream."""
161- event = InvokeModelWithBidirectionalStreamInputChunk (
162- value = BidirectionalInputPayloadPart (bytes_ = event_json .encode ('utf-8' ))
163- )
164- await self .stream .input_stream .send (event )
279+
165280
166281 async def start_audio_input (self ):
167282 """Start audio input stream."""
0 commit comments