diff --git a/src/Custom/RealtimeConversation/RealtimeConversationSession.cs b/src/Custom/RealtimeConversation/RealtimeConversationSession.cs index 960f817e7..61e8c9eaa 100644 --- a/src/Custom/RealtimeConversation/RealtimeConversationSession.cs +++ b/src/Custom/RealtimeConversation/RealtimeConversationSession.cs @@ -20,8 +20,8 @@ public partial class RealtimeConversationSession : IDisposable private readonly RealtimeConversationClient _parentClient; private readonly Uri _endpoint; private readonly ApiKeyCredential _credential; - private readonly object _sendingAudioLock = new(); - private bool _isSendingAudio = false; + private readonly SemaphoreSlim _audioSendSemaphore = new(1, 1); + private bool _isSendingAudioStream = false; internal bool ShouldBufferTurnResponseData { get; set; } @@ -47,13 +47,13 @@ protected internal RealtimeConversationSession( public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken cancellationToken = default) { Argument.AssertNotNull(audio, nameof(audio)); - lock (_sendingAudioLock) + using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false)) { - if (_isSendingAudio) + if (_isSendingAudioStream) { throw new InvalidOperationException($"Only one stream of audio may be sent at once."); } - _isSendingAudio = true; + _isSendingAudioStream = true; } try { @@ -75,9 +75,9 @@ public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken ca } finally { - lock (_sendingAudioLock) + using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false)) { - _isSendingAudio = false; + _isSendingAudioStream = false; } } } @@ -85,13 +85,13 @@ public virtual async Task SendInputAudioAsync(Stream audio, CancellationToken ca public virtual void SendInputAudio(Stream audio, CancellationToken cancellationToken = default) { Argument.AssertNotNull(audio, nameof(audio)); - lock (_sendingAudioLock) + using (_audioSendSemaphore.AutoReleaseWait(cancellationToken)) { - if (_isSendingAudio) + if (_isSendingAudioStream) { throw new InvalidOperationException($"Only one stream of audio may be sent at once."); } - _isSendingAudio = true; + _isSendingAudioStream = true; } try { @@ -113,9 +113,9 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT } finally { - lock (_sendingAudioLock) + using (_audioSendSemaphore.AutoReleaseWait(cancellationToken)) { - _isSendingAudio = false; + _isSendingAudioStream = false; } } } @@ -130,18 +130,17 @@ public virtual void SendInputAudio(Stream audio, CancellationToken cancellationT public virtual async Task SendInputAudioAsync(BinaryData audio, CancellationToken cancellationToken = default) { Argument.AssertNotNull(audio, nameof(audio)); - lock (_sendingAudioLock) + using (await _audioSendSemaphore.AutoReleaseWaitAsync(cancellationToken).ConfigureAwait(false)) { - if (_isSendingAudio) + if (_isSendingAudioStream) { throw new InvalidOperationException($"Cannot send a standalone audio chunk while a stream is already in progress."); } - _isSendingAudio = true; + // TODO: consider automatically limiting/breaking size of chunk (as with streaming) + InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio); + BinaryData requestData = ModelReaderWriter.Write(internalCommand); + await SendCommandAsync(requestData, cancellationToken.ToRequestOptions()).ConfigureAwait(false); } - // TODO: consider automatically limiting/breaking size of chunk (as with streaming) - InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio); - BinaryData requestData = ModelReaderWriter.Write(internalCommand); - await SendCommandAsync(requestData, cancellationToken.ToRequestOptions()).ConfigureAwait(false); } /// @@ -154,18 +153,17 @@ public virtual async Task SendInputAudioAsync(BinaryData audio, CancellationToke public virtual void SendInputAudio(BinaryData audio, CancellationToken cancellationToken = default) { Argument.AssertNotNull(audio, nameof(audio)); - lock (_sendingAudioLock) + using (_audioSendSemaphore.AutoReleaseWait(cancellationToken)) { - if (_isSendingAudio) + if (_isSendingAudioStream) { throw new InvalidOperationException($"Cannot send a standalone audio chunk while a stream is already in progress."); } - _isSendingAudio = true; + // TODO: consider automatically limiting/breaking size of chunk (as with streaming) + InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio); + BinaryData requestData = ModelReaderWriter.Write(internalCommand); + SendCommand(requestData, cancellationToken.ToRequestOptions()); } - // TODO: consider automatically limiting/breaking size of chunk (as with streaming) - InternalRealtimeClientEventInputAudioBufferAppend internalCommand = new(audio); - BinaryData requestData = ModelReaderWriter.Write(internalCommand); - SendCommand(requestData, cancellationToken.ToRequestOptions()); } public virtual async Task ClearInputAudioAsync(CancellationToken cancellationToken = default) diff --git a/src/Utility/SemaphoreSlimExtensions.cs b/src/Utility/SemaphoreSlimExtensions.cs new file mode 100644 index 000000000..9dc9d5bba --- /dev/null +++ b/src/Utility/SemaphoreSlimExtensions.cs @@ -0,0 +1,58 @@ +using System; +using System.Diagnostics.Contracts; +using System.Threading; +using System.Threading.Tasks; + +namespace OpenAI; + +internal static class SemaphoreSlimExtensions +{ + public static async Task AutoReleaseWaitAsync( + this SemaphoreSlim semaphore, + CancellationToken cancellationToken = default) + { + Contract.Requires(semaphore != null); + var wrapper = new ReleaseableSemaphoreSlimWrapper(semaphore); + await semaphore.WaitAsync(cancellationToken); + return wrapper; + } + + public static IDisposable AutoReleaseWait( + this SemaphoreSlim semaphore, + CancellationToken cancellationToken = default) + { + Contract.Requires(semaphore != null); + var wrapper = new ReleaseableSemaphoreSlimWrapper(semaphore); + semaphore.Wait(cancellationToken); + return wrapper; + } + + private class ReleaseableSemaphoreSlimWrapper + : IDisposable + { + private readonly SemaphoreSlim semaphore; + private bool alreadyDisposed = false; + + public ReleaseableSemaphoreSlimWrapper(SemaphoreSlim semaphore) + => this.semaphore = semaphore; + + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + + protected void Dispose(bool disposeActuallyCalled) + { + if (!this.alreadyDisposed) + { + if (disposeActuallyCalled) + { + this.semaphore?.Release(); + } + + this.alreadyDisposed = true; + } + } + } +} \ No newline at end of file diff --git a/tests/RealtimeConversation/ConversationTests.cs b/tests/RealtimeConversation/ConversationTests.cs index 4b171fd3b..a38fdecfc 100644 --- a/tests/RealtimeConversation/ConversationTests.cs +++ b/tests/RealtimeConversation/ConversationTests.cs @@ -1,6 +1,7 @@ using NUnit.Framework; using OpenAI.RealtimeConversation; using System; +using System.Buffers; using System.ClientModel; using System.ClientModel.Primitives; using System.Collections.Generic; @@ -8,6 +9,7 @@ using System.Linq; using System.Numerics; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace OpenAI.Tests.Conversation; @@ -239,7 +241,47 @@ await session.AddItemAsync( } [Test] - public async Task AudioWithToolsWorks() + public async Task AudioStreamConvenienceBlocksCorrectly() + { + RealtimeConversationClient client = GetTestClient(); + using RealtimeConversationSession session = await client.StartConversationSessionAsync(CancellationToken); + + string inputAudioFilePath = Path.Join("Assets", "realtime_whats_the_weather_pcm16_24khz_mono.wav"); + using TestDelayedFileReadStream delayedStream = new(inputAudioFilePath, TimeSpan.FromMilliseconds(200), readsBeforeDelay: 2); + _ = session.SendInputAudioAsync(delayedStream, CancellationToken); + + bool gotSpeechStarted = false; + + await foreach (ConversationUpdate update in session.ReceiveUpdatesAsync(CancellationToken)) + { + if (update is ConversationInputSpeechStartedUpdate) + { + gotSpeechStarted = true; + Assert.ThrowsAsync( + async () => + { + using MemoryStream dummyStream = new(); + await session.SendInputAudioAsync(dummyStream, CancellationToken); + }, + "Sending a Stream while another Stream is being sent should throw!"); + Assert.ThrowsAsync( + async () => + { + BinaryData dummyData = BinaryData.FromString("hello, world! this isn't audio."); + await session.SendInputAudioAsync(dummyData, CancellationToken); + }, + "Sending BinaryData while a Stream is being sent should throw!"); + break; + } + } + + Assert.That(gotSpeechStarted, Is.True); + } + + [Test] + [TestCase(TestAudioSendType.WithAudioStreamHelper)] + [TestCase(TestAudioSendType.WithManualAudioChunks)] + public async Task AudioWithToolsWorks(TestAudioSendType audioSendType) { RealtimeConversationClient client = GetTestClient(); using RealtimeConversationSession session = await client.StartConversationSessionAsync(CancellationToken); @@ -285,8 +327,27 @@ public async Task AudioWithToolsWorks() await session.ConfigureSessionAsync(options, CancellationToken); - using Stream audioStream = File.OpenRead(Path.Join("Assets", "realtime_whats_the_weather_pcm16_24khz_mono.wav")); - _ = session.SendInputAudioAsync(audioStream, CancellationToken); + _ = Task.Run(async () => + { + string inputAudioFilePath = Path.Join("Assets", "realtime_whats_the_weather_pcm16_24khz_mono.wav"); + if (audioSendType == TestAudioSendType.WithAudioStreamHelper) + { + using Stream audioStream = File.OpenRead(inputAudioFilePath); + await session.SendInputAudioAsync(audioStream, CancellationToken); + } + else if (audioSendType == TestAudioSendType.WithManualAudioChunks) + { + byte[] allAudioBytes = await File.ReadAllBytesAsync(inputAudioFilePath, CancellationToken); + const int audioSendBufferLength = 8 * 1024; + byte[] audioSendBuffer = ArrayPool.Shared.Rent(audioSendBufferLength); + for (int readPos = 0; readPos < allAudioBytes.Length; readPos += audioSendBufferLength) + { + int nextSegmentLength = Math.Min(audioSendBufferLength, allAudioBytes.Length - readPos); + ArraySegment nextSegment = new(allAudioBytes, readPos, nextSegmentLength); + await session.SendInputAudioAsync(BinaryData.FromBytes(nextSegment), CancellationToken); + } + } + }); string userTranscript = null; @@ -465,4 +526,46 @@ public async Task CanAddItems() Assert.That(itemCreatedCount, Is.EqualTo(items.Count + 1)); } + + public enum TestAudioSendType + { + WithAudioStreamHelper, + WithManualAudioChunks + } + + private class TestDelayedFileReadStream : FileStream + { + private readonly TimeSpan _delayBetweenReads; + private readonly int _readsBeforeDelay; + private int _readsPerformed; + + public TestDelayedFileReadStream( + string path, + TimeSpan delayBetweenReads, + int readsBeforeDelay = 0) + : base(path, FileMode.Open, FileAccess.Read) + { + _delayBetweenReads = delayBetweenReads; + _readsBeforeDelay = readsBeforeDelay; + _readsPerformed = 0; + } + + public override int Read(byte[] buffer, int offset, int count) + { + if (++_readsPerformed > _readsBeforeDelay) + { + System.Threading.Thread.Sleep((int)_delayBetweenReads.TotalMilliseconds); + } + return base.Read(buffer, offset, count); + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + if (++_readsPerformed > _readsBeforeDelay) + { + await Task.Delay(_delayBetweenReads); + } + return await base.ReadAsync(buffer, offset, count, cancellationToken); + } + } }