Skip to content

Commit ee76588

Browse files
authored
Add adapters to convert Azure.Core Response to SCM ClientResult to support SSE streams using SCM CollectionResult types (#46827)
* WIP on adapter * add response headers adapter * updates * updates
1 parent 7b3bff7 commit ee76588

File tree

6 files changed

+107
-7
lines changed

6 files changed

+107
-7
lines changed

sdk/ai/Azure.AI.Projects/src/Custom/Agent/AgentClient.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,9 @@ public virtual Task<Response<ThreadRun>> CreateRunAsync(AgentThread thread, Agen
165165
/// <param name="cancellationToken"> The cancellation token to use. </param>
166166
/// <exception cref="ArgumentNullException"> <paramref name="threadId"/> or <paramref name="assistantId"/> is null. </exception>
167167
/// <exception cref="ArgumentException"> <paramref name="threadId"/> is an empty string, and was expected to be non-empty. </exception>
168+
#pragma warning disable AZC0015 // Unexpected client method return type.
168169
public virtual AsyncCollectionResult<StreamingUpdate> CreateRunStreamingAsync(string threadId, string assistantId, string overrideModelName = null, string overrideInstructions = null, string additionalInstructions = null, IEnumerable<ThreadMessage> additionalMessages = null, IEnumerable<ToolDefinition> overrideTools = null, float? temperature = null, float? topP = null, int? maxPromptTokens = null, int? maxCompletionTokens = null, TruncationObject truncationStrategy = null, BinaryData toolChoice = null, BinaryData responseFormat = null, IReadOnlyDictionary<string, string> metadata = null, CancellationToken cancellationToken = default)
170+
#pragma warning restore AZC0015 // Unexpected client method return type.
169171
{
170172
Argument.AssertNotNullOrEmpty(threadId, nameof(threadId));
171173
Argument.AssertNotNull(assistantId, nameof(assistantId));
@@ -237,7 +239,9 @@ async Task<Response> sendRequestAsync() =>
237239
/// <param name="cancellationToken"> The cancellation token to use. </param>
238240
/// <exception cref="ArgumentNullException"> <paramref name="threadId"/> or <paramref name="assistantId"/> is null. </exception>
239241
/// <exception cref="ArgumentException"> <paramref name="threadId"/> is an empty string, and was expected to be non-empty. </exception>
242+
#pragma warning disable AZC0015 // Unexpected client method return type.
240243
public virtual CollectionResult<StreamingUpdate> CreateRunStreaming(string threadId, string assistantId, string overrideModelName = null, string overrideInstructions = null, string additionalInstructions = null, IEnumerable<ThreadMessage> additionalMessages = null, IEnumerable<ToolDefinition> overrideTools = null, float? temperature = null, float? topP = null, int? maxPromptTokens = null, int? maxCompletionTokens = null, TruncationObject truncationStrategy = null, BinaryData toolChoice = null, BinaryData responseFormat = null, IReadOnlyDictionary<string, string> metadata = null, CancellationToken cancellationToken = default)
244+
#pragma warning restore AZC0015 // Unexpected client method return type.
241245
{
242246
Argument.AssertNotNullOrEmpty(threadId, nameof(threadId));
243247
Argument.AssertNotNull(assistantId, nameof(assistantId));

sdk/ai/Azure.AI.Projects/src/Custom/Agent/Streaming/AsyncStreamingUpdateCollection.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ namespace Azure.AI.Projects;
1919
/// </summary>
2020
internal class AsyncStreamingUpdateCollection : AsyncCollectionResult<StreamingUpdate>
2121
{
22-
private readonly Func<Task<ClientResult>> _sendRequestAsync;
22+
private readonly Func<Task<Response>> _sendRequestAsync;
2323
private readonly CancellationToken _cancellationToken;
2424

25-
public AsyncStreamingUpdateCollection(Func<Task<ClientResult>> sendRequestAsync,
25+
public AsyncStreamingUpdateCollection(Func<Task<Response>> sendRequestAsync,
2626
CancellationToken cancellationToken)
2727
{
2828
Argument.AssertNotNull(sendRequestAsync, nameof(sendRequestAsync));
@@ -37,9 +37,12 @@ public AsyncStreamingUpdateCollection(Func<Task<ClientResult>> sendRequestAsync,
3737

3838
public async override IAsyncEnumerable<ClientResult> GetRawPagesAsync()
3939
{
40+
Response response = await _sendRequestAsync().ConfigureAwait(false);
41+
PipelineResponse scmResponse = new ResponseAdapter(response);
42+
4043
// We don't currently support resuming a dropped connection from the
4144
// last received event, so the response collection has a single element.
42-
yield return await _sendRequestAsync().ConfigureAwait(false);
45+
yield return ClientResult.FromResponse(scmResponse);
4346
}
4447

4548
protected async override IAsyncEnumerable<StreamingUpdate> GetValuesFromPageAsync(ClientResult page)

sdk/ai/Azure.AI.Projects/src/Custom/Agent/Streaming/StreamingUpdateCollection.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ namespace Azure.AI.Projects;
1818
/// </summary>
1919
internal class StreamingUpdateCollection : CollectionResult<StreamingUpdate>
2020
{
21-
private readonly Func<ClientResult> _sendRequest;
21+
private readonly Func<Response> _sendRequest;
2222
private readonly CancellationToken _cancellationToken;
2323

2424
public StreamingUpdateCollection(
25-
Func<ClientResult> sendRequest,
25+
Func<Response> sendRequest,
2626
CancellationToken cancellationToken)
2727
{
2828
Argument.AssertNotNull(sendRequest, nameof(sendRequest));
@@ -37,9 +37,12 @@ public StreamingUpdateCollection(
3737

3838
public override IEnumerable<ClientResult> GetRawPages()
3939
{
40+
Response response = _sendRequest();
41+
PipelineResponse scmResponse = new ResponseAdapter(response);
42+
4043
// We don't currently support resuming a dropped connection from the
4144
// last received event, so the response collection has a single element.
42-
yield return _sendRequest();
45+
yield return ClientResult.FromResponse(scmResponse);
4346
}
4447
protected override IEnumerable<StreamingUpdate> GetValuesFromPage(ClientResult page)
4548
{

sdk/ai/Azure.AI.Projects/src/Custom/Agent/Streaming/StreamingUpdateReason.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public enum StreamingUpdateReason
2020
/// Indicates that an update was generated as part of a <c>thread.created</c> event.
2121
/// </summary>
2222
/// <remarks> This reason is typically only associated with calls to
23-
/// <see cref="AgentClient.CreateThreadAndRunStreaming(Agent, ThreadCreationOptions, RunCreationOptions)"/>,
23+
/// <see cref="AgentClient.CreateThreadAndRun(string, AgentThreadCreationOptions, string, string, System.Collections.Generic.IEnumerable{ToolDefinition}, UpdateToolResourcesOptions, bool?, float?, float?, int?, int?, TruncationObject, System.BinaryData, System.BinaryData, System.Collections.Generic.IReadOnlyDictionary{string, string}, System.Threading.CancellationToken)"/>,
2424
/// as other run-related methods operate on a thread that has previously been created.
2525
/// </remarks>
2626
ThreadCreated,
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.ClientModel.Primitives;
6+
using System.IO;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
10+
#nullable enable
11+
12+
namespace Azure.AI.Projects;
13+
14+
/// <summary>
15+
/// Adapts an Azure.Core Response to an SCM PipelineResponse.
16+
/// </summary>
17+
internal class ResponseAdapter : PipelineResponse
18+
{
19+
private readonly Response _azureResponse;
20+
private PipelineResponseHeaders? _headers;
21+
22+
public ResponseAdapter(Response azureResponse)
23+
{
24+
_azureResponse = azureResponse;
25+
}
26+
27+
public override int Status => _azureResponse.Status;
28+
29+
public override string ReasonPhrase => _azureResponse.ReasonPhrase;
30+
31+
public override Stream? ContentStream
32+
{
33+
get => _azureResponse?.ContentStream;
34+
set => _azureResponse.ContentStream = value;
35+
}
36+
37+
public override BinaryData Content => _azureResponse.Content;
38+
39+
protected override PipelineResponseHeaders HeadersCore =>
40+
_headers ??= new ResponseHeadersAdapter(_azureResponse.Headers);
41+
42+
public override BinaryData BufferContent(CancellationToken cancellationToken = default)
43+
{
44+
throw new NotSupportedException("Content buffering is not supported for SSE response streams.");
45+
}
46+
47+
public override ValueTask<BinaryData> BufferContentAsync(CancellationToken cancellationToken = default)
48+
{
49+
throw new NotSupportedException("Content buffering is not supported for SSE response streams.");
50+
}
51+
52+
public override void Dispose() => _azureResponse?.Dispose();
53+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System.ClientModel.Primitives;
5+
using System.Collections.Generic;
6+
using Azure.Core;
7+
8+
#nullable enable
9+
10+
namespace Azure.AI.Projects;
11+
12+
/// <summary>
13+
/// Adapts an Azure.Core ResponseHeaders to an SCM PipelineResponseHeaders.
14+
/// </summary>
15+
internal class ResponseHeadersAdapter : PipelineResponseHeaders
16+
{
17+
private readonly ResponseHeaders _azureHeaders;
18+
19+
public ResponseHeadersAdapter(ResponseHeaders azureHeaders)
20+
{
21+
_azureHeaders = azureHeaders;
22+
}
23+
24+
public override IEnumerator<KeyValuePair<string, string>> GetEnumerator()
25+
{
26+
foreach (HttpHeader header in _azureHeaders)
27+
{
28+
yield return new KeyValuePair<string, string>(header.Name, header.Value);
29+
}
30+
}
31+
32+
public override bool TryGetValue(string name, out string? value)
33+
=> _azureHeaders.TryGetValue(name, out value);
34+
35+
public override bool TryGetValues(string name, out IEnumerable<string>? values)
36+
=> _azureHeaders.TryGetValue(name, out values);
37+
}

0 commit comments

Comments
 (0)