diff --git a/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj b/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj index 537c7bc339d..9ce1e25113f 100644 --- a/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj +++ b/src/Elastic.Clients.Elasticsearch/Elastic.Clients.Elasticsearch.csproj @@ -1,4 +1,4 @@ - + Elastic.Clients.Elasticsearch @@ -31,6 +31,7 @@ + all diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryExecutor.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryExecutor.cs new file mode 100644 index 00000000000..c791608f860 --- /dev/null +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryExecutor.cs @@ -0,0 +1,281 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Esql; +using Elastic.Esql.Execution; +using Elastic.Esql.QueryModel; +using Elastic.Transport; + +#if NET10_0_OR_GREATER +using System.IO.Pipelines; +#endif + +namespace Elastic.Clients.Elasticsearch.Esql; + +/// +/// Implements by delegating to the native +/// typed request/response pipeline. +/// +internal sealed class EsqlQueryExecutor : IEsqlQueryExecutor +{ + private readonly EsqlNamespacedClient _client; + + public EsqlQueryExecutor(EsqlNamespacedClient client) + { + _client = client ?? throw new ArgumentNullException(nameof(client)); + } + + public IEsqlResponse ExecuteQuery(string esql, EsqlParameters? parameters, object? options) + { + var queryOptions = ResolveOptions(options); + var request = BuildQueryRequest(esql, parameters, queryOptions); + request.BeforeRequest(); + var response = _client.DoRequest(request); + return new EsqlTransportResponse(response); + } + + public async Task ExecuteQueryAsync(string esql, EsqlParameters? parameters, object? options, CancellationToken cancellationToken) + { + var queryOptions = ResolveOptions(options); + var request = BuildQueryRequest(esql, parameters, queryOptions); + request.BeforeRequest(); + var response = await _client.DoRequestAsync(request, cancellationToken) + .ConfigureAwait(false); + return new EsqlTransportAsyncResponse(response); + } + + public IEsqlResponse SubmitAsyncQuery(string esql, EsqlParameters? parameters, object? options, EsqlAsyncQueryOptions? asyncOptions) + { + var queryOptions = ResolveOptions(options); + var request = BuildAsyncQueryRequest(esql, parameters, queryOptions, asyncOptions); + request.BeforeRequest(); + var response = _client.DoRequest(request); + return new EsqlTransportResponse(response); + } + + public async Task SubmitAsyncQueryAsync(string esql, EsqlParameters? parameters, object? options, EsqlAsyncQueryOptions? asyncOptions, CancellationToken cancellationToken) + { + var queryOptions = ResolveOptions(options); + var request = BuildAsyncQueryRequest(esql, parameters, queryOptions, asyncOptions); + request.BeforeRequest(); + var response = await _client.DoRequestAsync(request, cancellationToken) + .ConfigureAwait(false); + return new EsqlTransportAsyncResponse(response); + } + + public IEsqlResponse PollAsyncQuery(string queryId, object? options) + { + var queryOptions = ResolveOptions(options); + var request = new AsyncQueryGetRequest(queryId) { Format = EsqlFormat.Json }; + if (queryOptions?.RequestConfiguration is not null) + request.RequestConfiguration = queryOptions.RequestConfiguration; + request.BeforeRequest(); + var response = _client.DoRequest(request); + return new EsqlTransportResponse(response); + } + + public async Task PollAsyncQueryAsync(string queryId, object? options, CancellationToken cancellationToken) + { + var queryOptions = ResolveOptions(options); + var request = new AsyncQueryGetRequest(queryId) { Format = EsqlFormat.Json }; + if (queryOptions?.RequestConfiguration is not null) + request.RequestConfiguration = queryOptions.RequestConfiguration; + request.BeforeRequest(); + var response = await _client.DoRequestAsync(request, cancellationToken) + .ConfigureAwait(false); + return new EsqlTransportAsyncResponse(response); + } + + public void DeleteAsyncQuery(string queryId, object? options) + { + var queryOptions = ResolveOptions(options); + var request = new AsyncQueryDeleteRequest(queryId); + if (queryOptions?.RequestConfiguration is not null) + request.RequestConfiguration = queryOptions.RequestConfiguration; + request.BeforeRequest(); + _client.DoRequest(request); + } + + public async Task DeleteAsyncQueryAsync(string queryId, object? options, CancellationToken cancellationToken) + { + var queryOptions = ResolveOptions(options); + var request = new AsyncQueryDeleteRequest(queryId); + if (queryOptions?.RequestConfiguration is not null) + request.RequestConfiguration = queryOptions.RequestConfiguration; + request.BeforeRequest(); + await _client.DoRequestAsync(request, cancellationToken) + .ConfigureAwait(false); + } + + private static EsqlQueryOptions? ResolveOptions(object? options) => + options as EsqlQueryOptions; + + private static EsqlQueryRequest BuildQueryRequest(string esql, EsqlParameters? parameters, EsqlQueryOptions? options) + { + var request = new EsqlQueryRequest(esql) + { + Format = EsqlFormat.Json, + Columnar = false, + Params = MergeAndConvertParams(parameters, options?.NamedParameters) + }; + + ApplyQueryOptions(request, options); + return request; + } + + private static AsyncQueryRequest BuildAsyncQueryRequest(string esql, EsqlParameters? parameters, EsqlQueryOptions? queryOptions, EsqlAsyncQueryOptions? asyncOptions) + { + var request = new AsyncQueryRequest(esql) + { + Format = EsqlFormat.Json, + Columnar = false, + Params = MergeAndConvertParams(parameters, queryOptions?.NamedParameters) + }; + + ApplyQueryOptions(request, queryOptions); + + if (asyncOptions is not null) + { + if (asyncOptions.WaitForCompletionTimeout is { } waitTimeout) + request.WaitForCompletionTimeout = new Duration(waitTimeout); + + if (asyncOptions.KeepAlive is { } keepAlive) + request.KeepAlive = new Duration(keepAlive); + + request.KeepOnCompletion = asyncOptions.KeepOnCompletion; + } + + return request; + } + + private static void ApplyQueryOptions(EsqlQueryRequest request, EsqlQueryOptions? options) + { + if (options is null) + return; + + request.Locale = options.Locale; + request.TimeZone = options.TimeZone; + request.Filter = options.Filter; + request.AllowPartialResults = options.AllowPartialResults; + request.DropNullColumns = options.DropNullColumns; + request.ProjectRouting = options.ProjectRouting; + + if (options.RequestConfiguration is not null) + request.RequestConfiguration = options.RequestConfiguration; + } + + private static void ApplyQueryOptions(AsyncQueryRequest request, EsqlQueryOptions? options) + { + if (options is null) + return; + + request.Locale = options.Locale; + request.TimeZone = options.TimeZone; + request.Filter = options.Filter; + request.AllowPartialResults = options.AllowPartialResults; + request.DropNullColumns = options.DropNullColumns; + request.ProjectRouting = options.ProjectRouting; + + if (options.RequestConfiguration is not null) + request.RequestConfiguration = options.RequestConfiguration; + } + + private static Union>, ICollection>>>? + MergeAndConvertParams(EsqlParameters? translated, Dictionary? userParams) + { + var hasTranslated = translated is not null && translated.HasParameters; + var hasUser = userParams is { Count: > 0 }; + + if (!hasTranslated && !hasUser) + return null; + + var merged = new Dictionary(); + + if (hasTranslated) + { + foreach (var kvp in translated!.Parameters) + merged[kvp.Key] = ConvertJsonElement(kvp.Value); + } + + if (hasUser) + { + foreach (var kvp in userParams!) + merged[kvp.Key] = kvp.Value; + } + + var namedParams = new List>>(merged.Count); + foreach (var kvp in merged) + { + namedParams.Add(new KeyValuePair>( + kvp.Key, [kvp.Value])); + } + + return new Union>, ICollection>>>(namedParams); + + static FieldValue ConvertJsonElement(JsonElement element) => + element.ValueKind switch + { + JsonValueKind.String => FieldValue.String(element.GetString()!), + JsonValueKind.Number when element.TryGetInt64(out var l) => FieldValue.Long(l), + JsonValueKind.Number => FieldValue.Double(element.GetDouble()), + JsonValueKind.True => FieldValue.True, + JsonValueKind.False => FieldValue.False, + JsonValueKind.Null or JsonValueKind.Undefined => FieldValue.Null, + _ => FieldValue.String(element.GetRawText()) + }; + } +} + +internal sealed class EsqlTransportResponse : IEsqlResponse +{ + private readonly StreamResponse _response; + + public EsqlTransportResponse(StreamResponse response) => _response = response; + + public Stream Body => _response.Body; + + public void Dispose() => _response.Dispose(); +} + +#if NET10_0_OR_GREATER +internal sealed class EsqlTransportAsyncResponse : IEsqlAsyncResponse +{ + private readonly StreamResponse _response; + + public EsqlTransportAsyncResponse(StreamResponse response) + { + _response = response; + Body = PipeReader.Create(response.Body); + } + + public PipeReader Body { get; } + + public async ValueTask DisposeAsync() + { + await Body.CompleteAsync().ConfigureAwait(false); + _response.Dispose(); + } +} +#else +internal sealed class EsqlTransportAsyncResponse : IEsqlAsyncResponse +{ + private readonly StreamResponse _response; + + public EsqlTransportAsyncResponse(StreamResponse response) => _response = response; + + public Stream Body => _response.Body; + + public ValueTask DisposeAsync() + { + _response.Dispose(); + return default; + } +} +#endif diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryOptions.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryOptions.cs new file mode 100644 index 00000000000..9ed5b0d50d4 --- /dev/null +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryOptions.cs @@ -0,0 +1,40 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System.Collections.Generic; + +using Elastic.Transport; + +namespace Elastic.Clients.Elasticsearch.Esql; + +/// Per-query options for LINQ-to-ES|QL queries executed via WithOptions. +public sealed record EsqlQueryOptions +{ + /// Per-request transport configuration (timeouts, headers, auth). + public IRequestConfiguration? RequestConfiguration { get; init; } + + /// If true, partial results will be returned on shard failures. + public bool? AllowPartialResults { get; init; } + + /// If true, entirely null columns are removed from the response. + public bool? DropNullColumns { get; init; } + + /// A Query DSL filter applied to the document set before the ES|QL query runs. + public QueryDsl.Query? Filter { get; init; } + + /// Locale for result formatting (e.g., "en-US"). + public string? Locale { get; init; } + + /// Project routing for serverless cross-project queries. + public string? ProjectRouting { get; init; } + + /// Default timezone for date operations (e.g., "UTC"). + public string? TimeZone { get; init; } + + /// + /// User-supplied named parameters. Merged with parameters from the translated query. + /// If a key exists in both, NamedParameters takes precedence. + /// + public Dictionary? NamedParameters { get; init; } +} diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryableExtensions.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryableExtensions.cs new file mode 100644 index 00000000000..6a361099560 --- /dev/null +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlQueryableExtensions.cs @@ -0,0 +1,22 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using System; +using System.Linq.Expressions; + +using Elastic.Esql.Core; + +namespace Elastic.Clients.Elasticsearch.Esql; + +/// Extension methods for attaching query options to LINQ-to-ES|QL queries. +public static class EsqlQueryableExtensions +{ + /// Attaches ES|QL query options to the query pipeline. + public static IEsqlQueryable WithOptions(this IEsqlQueryable source, EsqlQueryOptions options) + { + var method = new Func, EsqlQueryOptions, IEsqlQueryable>(WithOptions).Method; + return (IEsqlQueryable)source.Provider.CreateQuery( + Expression.Call(null, method, source.Expression, Expression.Constant(options))); + } +} diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlSourceInferenceInterceptor.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlSourceInferenceInterceptor.cs new file mode 100644 index 00000000000..0c2ad959955 --- /dev/null +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Api/Esql/EsqlSourceInferenceInterceptor.cs @@ -0,0 +1,24 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +using Elastic.Esql.Core; +using Elastic.Esql.QueryModel; + +namespace Elastic.Clients.Elasticsearch.Esql; + +internal sealed class EsqlSourceInferenceInterceptor : IEsqlQueryInterceptor +{ + private readonly Inferrer _inferrer; + + public EsqlSourceInferenceInterceptor(Inferrer inferrer) => _inferrer = inferrer; + + public EsqlQuery Intercept(EsqlQuery query) + { + if (query.Source is not null) + return query; + + var indexName = _inferrer.IndexName(query.ElementType); + return query.WithSource(indexName); + } +} diff --git a/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.Esql.cs b/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.Esql.cs index 75b5f395ab5..8ad44918bb0 100644 --- a/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.Esql.cs +++ b/src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.Esql.cs @@ -11,7 +11,12 @@ using System.Text.Json.Serialization; using System.Threading.Tasks; using System.Threading; +using Elastic.Esql; +using Elastic.Esql.Core; +using Elastic.Esql.Execution; +using Elastic.Esql.Extensions; using Elastic.Transport; +using Elastic.Transport.Extensions; namespace Elastic.Clients.Elasticsearch.Esql; @@ -26,6 +31,10 @@ public partial class EsqlNamespacedClient #pragma warning restore IL2026, IL3050 + private EsqlQueryProvider? _queryProvider; + + #region Stream response ES|QL query methods + /// /// Executes an ES|QL request and returns the response as a stream. /// @@ -58,6 +67,162 @@ public virtual Task QueryAsStreamAsync( return DoRequestAsync(request, cancellationToken); } + #endregion + + #region LINQ to ES|QL query methods + + /// + /// Creates a new LINQ-to-ES|QL queryable for the specified entity type. + /// + /// The entity type to query. + /// Optional per-query options (filter, timeouts, parameters, etc.). + /// An that can be used to build and execute ES|QL queries via LINQ. + public IEsqlQueryable CreateQuery(EsqlQueryOptions? queryOptions = null) where T : class + => (IEsqlQueryable)ApplyOptions(new EsqlQueryable(GetOrCreateQueryProvider()), queryOptions); + + /// + /// Executes a synchronous LINQ-to-ES|QL query and returns the results. + /// + /// The entity type to query. + /// A function that builds the query from an . + /// Optional per-query options (filter, timeouts, parameters, etc.). + /// An containing the query results. + public IEnumerable Query( + Func, IQueryable> query, + EsqlQueryOptions? queryOptions = null) where T : class => + query(CreateQuery(queryOptions)); + + /// + /// Executes a synchronous LINQ-to-ES|QL query with a projection and returns the results. + /// + /// The source entity type. + /// The projected result type. + /// A function that builds the query from an . + /// Optional per-query options (filter, timeouts, parameters, etc.). + /// An containing the query results. + public IEnumerable Query( + Func, IQueryable> query, + EsqlQueryOptions? queryOptions = null) where T : class => + query(CreateQuery(queryOptions)); + + /// + /// Asynchronously executes a LINQ-to-ES|QL query and returns the results as an async stream. + /// + /// The entity type to query. + /// A function that builds the query from an . + /// Optional per-query options (filter, timeouts, parameters, etc.). + /// A cancellation token. + /// An containing the query results. + /// Thrown if the query does not return an . + public IAsyncEnumerable QueryAsync( + Func, IQueryable> query, + EsqlQueryOptions? queryOptions = null, + CancellationToken cancellationToken = default) where T : class => + query(CreateQuery(queryOptions)).AsEsqlQueryable().AsAsyncEnumerable(cancellationToken); + + /// + /// Asynchronously executes a LINQ-to-ES|QL query with a projection and returns the results as an async stream. + /// + /// The source entity type. + /// The projected result type. + /// A function that builds the query from an . + /// Optional per-query options (filter, timeouts, parameters, etc.). + /// A cancellation token. + /// An containing the query results. + /// Thrown if the query does not return an . + public IAsyncEnumerable QueryAsync( + Func, IQueryable> query, + EsqlQueryOptions? queryOptions = null, + CancellationToken cancellationToken = default) where T : class => + query(CreateQuery(queryOptions)).AsEsqlQueryable().AsAsyncEnumerable(cancellationToken); + + /// + /// Submits a LINQ-to-ES|QL query as a server-side async query. + /// + /// The entity type to query. + /// A function that builds the query from an . + /// Optional per-query options (filter, timeouts, parameters, etc.). + /// Optional async query options. + /// An that can be used to poll and retrieve results. + /// Thrown if the query does not return an . + public EsqlAsyncQuery SubmitAsyncQuery( + Func, IQueryable> query, + EsqlQueryOptions? queryOptions = null, + EsqlAsyncQueryOptions? asyncQueryOptions = null) where T : class => + query(CreateQuery(queryOptions)).AsEsqlQueryable().ToAsyncQuery(asyncQueryOptions); + + /// + /// Submits a LINQ-to-ES|QL query with a projection as a server-side async query. + /// + public EsqlAsyncQuery SubmitAsyncQuery( + Func, IQueryable> query, + EsqlQueryOptions? queryOptions = null, + EsqlAsyncQueryOptions? asyncQueryOptions = null) where T : class => + query(CreateQuery(queryOptions)).AsEsqlQueryable().ToAsyncQuery(asyncQueryOptions); + + /// + /// Asynchronously submits a LINQ-to-ES|QL query as a server-side async query. + /// + /// The entity type to query. + /// A function that builds the query from an . + /// Optional per-query options (filter, timeouts, parameters, etc.). + /// Optional async query options. + /// A cancellation token. + /// An that can be used to poll and retrieve results. + /// Thrown if the query does not return an . + public Task> SubmitAsyncQueryAsync( + Func, IQueryable> query, + EsqlQueryOptions? queryOptions = null, + EsqlAsyncQueryOptions? asyncQueryOptions = null, + CancellationToken cancellationToken = default) where T : class => + query(CreateQuery(queryOptions)).AsEsqlQueryable().ToAsyncQueryAsync(asyncQueryOptions, cancellationToken); + + /// + /// Asynchronously submits a LINQ-to-ES|QL query with a projection as a server-side async query. + /// + public Task> SubmitAsyncQueryAsync( + Func, IQueryable> query, + EsqlQueryOptions? queryOptions = null, + EsqlAsyncQueryOptions? asyncQueryOptions = null, + CancellationToken cancellationToken = default) where T : class => + query(CreateQuery(queryOptions)).AsEsqlQueryable().ToAsyncQueryAsync(asyncQueryOptions, cancellationToken); + + private static IQueryable ApplyOptions(IQueryable queryable, EsqlQueryOptions? queryOptions) + { + if (queryOptions is null) + return queryable; + + if (queryable is not IEsqlQueryable esqlQueryable) + throw new InvalidOperationException("Query must return an IEsqlQueryable."); + + return esqlQueryable.WithOptions(queryOptions); + } + + private EsqlQueryProvider GetOrCreateQueryProvider() + { + if (_queryProvider is not null) + return _queryProvider; + + if (!Client.SourceSerializer.TryGetJsonSerializerOptions(out var options)) + { + throw new InvalidOperationException( + "The SourceSerializer does not support JsonSerializerOptions. An EsqlQueryProvider cannot be created."); + } + + var executor = new EsqlQueryExecutor(this); + var provider = new EsqlQueryProvider(options, executor) + { + Interceptor = new EsqlSourceInferenceInterceptor(Client.Infer) + }; + Interlocked.CompareExchange(ref _queryProvider, provider, null); + return _queryProvider!; + } + + #endregion + + #region Legacy ES|QL query methods + + [Obsolete("Use CreateQuery() for LINQ-based ES|QL queries.")] public virtual async Task> QueryAsObjectsAsync( Action> configureRequest, CancellationToken cancellationToken = default) @@ -77,10 +242,9 @@ void Configure(EsqlQueryRequestDescriptor descriptor) } } + [Obsolete("Use CreateQuery() for LINQ-based ES|QL queries.")] private static IEnumerable EsqlToObject(ElasticsearchClient client, EsqlQueryResponse response) { - // TODO: Improve performance - #pragma warning disable IL2026, IL3050 using var doc = JsonSerializer.Deserialize(response.Data, EsqlJsonSerializerOptions) ?? throw new JsonException(); #pragma warning restore IL2026, IL3050 @@ -138,6 +302,8 @@ private static IEnumerable EsqlToObject(ElasticsearchClient client, EsqlQu } } + #endregion + [JsonSerializable(typeof(JsonDocument))] internal sealed partial class EsqlJsonSerializerContext : JsonSerializerContext; diff --git a/tests/Tests.Core/Tests.Core.csproj b/tests/Tests.Core/Tests.Core.csproj index 6bd0841af68..2e891a67faa 100644 --- a/tests/Tests.Core/Tests.Core.csproj +++ b/tests/Tests.Core/Tests.Core.csproj @@ -12,7 +12,7 @@ - +