diff --git a/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj b/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj index 3010bacf06c..aeab1d403da 100644 --- a/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj +++ b/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj @@ -31,7 +31,7 @@ - + all diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryExecutor.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryExecutor.cs index c791608f860..e8f3fddd86c 100644 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryExecutor.cs +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryExecutor.cs @@ -12,6 +12,7 @@ using Elastic.Esql.Execution; using Elastic.Esql.QueryModel; using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; #if NET10_0_OR_GREATER using System.IO.Pipelines; @@ -37,7 +38,7 @@ public IEsqlResponse ExecuteQuery(string esql, EsqlParameters? parameters, objec var queryOptions = ResolveOptions(options); var request = BuildQueryRequest(esql, parameters, queryOptions); request.BeforeRequest(); - var response = _client.DoRequest(request); + var response = _client.DoRequest(request); return new EsqlTransportResponse(response); } @@ -46,8 +47,13 @@ public async Task ExecuteQueryAsync(string esql, EsqlParamet var queryOptions = ResolveOptions(options); var request = BuildQueryRequest(esql, parameters, queryOptions); request.BeforeRequest(); - var response = await _client.DoRequestAsync(request, cancellationToken) +#if NET10_0_OR_GREATER + var response = await _client.DoRequestAsync(request, cancellationToken) + .ConfigureAwait(false); +#else + var response = await _client.DoRequestAsync(request, cancellationToken) .ConfigureAwait(false); +#endif return new EsqlTransportAsyncResponse(response); } @@ -56,7 +62,7 @@ public IEsqlResponse SubmitAsyncQuery(string esql, EsqlParameters? parameters, o var queryOptions = ResolveOptions(options); var request = BuildAsyncQueryRequest(esql, parameters, queryOptions, asyncOptions); request.BeforeRequest(); - var response = _client.DoRequest(request); + var response = _client.DoRequest(request); return new EsqlTransportResponse(response); } @@ -65,8 +71,13 @@ public async Task SubmitAsyncQueryAsync(string esql, EsqlPar var queryOptions = ResolveOptions(options); var request = BuildAsyncQueryRequest(esql, parameters, queryOptions, asyncOptions); request.BeforeRequest(); - var response = await _client.DoRequestAsync(request, cancellationToken) +#if NET10_0_OR_GREATER + var response = await _client.DoRequestAsync(request, cancellationToken) .ConfigureAwait(false); +#else + var response = await _client.DoRequestAsync(request, cancellationToken) + .ConfigureAwait(false); +#endif return new EsqlTransportAsyncResponse(response); } @@ -77,7 +88,7 @@ public IEsqlResponse PollAsyncQuery(string queryId, object? options) if (queryOptions?.RequestConfiguration is not null) request.RequestConfiguration = queryOptions.RequestConfiguration; request.BeforeRequest(); - var response = _client.DoRequest(request); + var response = _client.DoRequest(request); return new EsqlTransportResponse(response); } @@ -88,8 +99,13 @@ public async Task PollAsyncQueryAsync(string queryId, object if (queryOptions?.RequestConfiguration is not null) request.RequestConfiguration = queryOptions.RequestConfiguration; request.BeforeRequest(); - var response = await _client.DoRequestAsync(request, cancellationToken) +#if NET10_0_OR_GREATER + var response = await _client.DoRequestAsync(request, cancellationToken) .ConfigureAwait(false); +#else + var response = await _client.DoRequestAsync(request, cancellationToken) + .ConfigureAwait(false); +#endif return new EsqlTransportAsyncResponse(response); } @@ -235,9 +251,9 @@ JsonValueKind.Number when element.TryGetInt64(out var l) => FieldValue.Long(l), internal sealed class EsqlTransportResponse : IEsqlResponse { - private readonly StreamResponse _response; + private readonly ElasticsearchStreamResponse _response; - public EsqlTransportResponse(StreamResponse response) => _response = response; + public EsqlTransportResponse(ElasticsearchStreamResponse response) => _response = response; public Stream Body => _response.Body; @@ -247,28 +263,21 @@ internal sealed class EsqlTransportResponse : IEsqlResponse #if NET10_0_OR_GREATER internal sealed class EsqlTransportAsyncResponse : IEsqlAsyncResponse { - private readonly StreamResponse _response; + private readonly ElasticsearchPipeResponse _response; - public EsqlTransportAsyncResponse(StreamResponse response) - { - _response = response; - Body = PipeReader.Create(response.Body); - } + public EsqlTransportAsyncResponse(ElasticsearchPipeResponse response) => _response = response; - public PipeReader Body { get; } + public PipeReader Body => _response.Body; - public async ValueTask DisposeAsync() - { - await Body.CompleteAsync().ConfigureAwait(false); - _response.Dispose(); - } + public async ValueTask DisposeAsync() => + await _response.DisposeAsync().ConfigureAwait(false); } #else internal sealed class EsqlTransportAsyncResponse : IEsqlAsyncResponse { - private readonly StreamResponse _response; + private readonly ElasticsearchStreamResponse _response; - public EsqlTransportAsyncResponse(StreamResponse response) => _response = response; + public EsqlTransportAsyncResponse(ElasticsearchStreamResponse response) => _response = response; public Stream Body => _response.Body; diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryRequest.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryRequest.cs deleted file mode 100644 index 0bd7841bfd9..00000000000 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryRequest.cs +++ /dev/null @@ -1,62 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information. - -using System.IO; -using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; - -using Elastic.Transport; - -namespace Elastic.Clients.Elasticsearch.Esql; - -internal sealed class EsqlResponseBuilder : TypedResponseBuilder -{ - protected override EsqlQueryResponse? Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, - Stream responseStream, - string contentType, long contentLength) - { - var bytes = responseStream switch - { - MemoryStream ms => ms.ToArray(), - _ => BytesFromStream(responseStream) - }; - - return new EsqlQueryResponse { Data = bytes }; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - static byte[] BytesFromStream(Stream stream) - { - using var binaryReader = new BinaryReader(stream); - - return binaryReader.ReadBytes((int)stream.Length); - } - } - - protected override async Task BuildAsync(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, - Stream responseStream, - string contentType, long contentLength, CancellationToken cancellationToken = default) - { - var bytes = responseStream switch - { - MemoryStream ms => ms.ToArray(), - _ => await BytesFromStreamAsync(responseStream, cancellationToken).ConfigureAwait(false) - }; - - return new EsqlQueryResponse { Data = bytes }; - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - static async Task BytesFromStreamAsync(Stream stream, CancellationToken cancellationToken) - { - using var ms = new MemoryStream(); -#if NETSTANDARD2_1_OR_GREATER || NET6_0_OR_GREATER - await stream.CopyToAsync(ms, cancellationToken).ConfigureAwait(false); -#else - await stream.CopyToAsync(ms).ConfigureAwait(false); -#endif - - return ms.ToArray(); - } - } -} diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryResponse.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryResponse.cs deleted file mode 100644 index a8f9904bae4..00000000000 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryResponse.cs +++ /dev/null @@ -1,15 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information. - -#if ELASTICSEARCH_SERVERLESS -namespace Elastic.Clients.Elasticsearch.Serverless.Esql; -#else - -namespace Elastic.Clients.Elasticsearch.Esql; -#endif - -public sealed partial class EsqlQueryResponse -{ - public byte[] Data { get; init; } -} diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.Esql.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.Esql.cs index 743ffba4465..7ba09d37612 100644 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.Esql.cs +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.Esql.cs @@ -41,7 +41,7 @@ public partial class EsqlNamespacedClient /// /// The ES|QL query result as a generic stream response. /// The response must be disposed after use. - public virtual Task QueryAsStreamAsync( + public virtual Task QueryAsStreamAsync( Action> configureRequest, CancellationToken cancellationToken = default) { @@ -49,7 +49,7 @@ public virtual Task QueryAsStreamAsync( configureRequest?.Invoke(descriptor); var request = descriptor.Instance; request.BeforeRequest(); - return DoRequestAsync(request, cancellationToken); + return DoRequestAsync(request, cancellationToken); } /// @@ -57,7 +57,7 @@ public virtual Task QueryAsStreamAsync( /// /// The ES|QL query result as a generic stream response. /// The response must be disposed after use. - public virtual Task QueryAsStreamAsync( + public virtual Task QueryAsStreamAsync( Action configureRequest, CancellationToken cancellationToken = default) { @@ -65,7 +65,7 @@ public virtual Task QueryAsStreamAsync( configureRequest?.Invoke(descriptor); var request = descriptor.Instance; request.BeforeRequest(); - return DoRequestAsync(request, cancellationToken); + return DoRequestAsync(request, cancellationToken); } #endregion diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs index 407b0a6c82f..36573b9514a 100644 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Core/Configuration/ElasticsearchClientSettings.cs @@ -436,7 +436,6 @@ protected ConnectionConfigurationBase( : base(nodePool, requestInvoker, serializer, registration ?? new ElasticsearchProductRegistration(typeof(ElasticsearchClient))) { UserAgent(ConnectionConfiguration.DefaultUserAgent); - ResponseBuilder(new EsqlResponseBuilder()); } bool TransportClientConfigurationValues.IncludeServerStackTraceOnError => _includeServerStackTraceOnError;