diff --git a/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchErrorExtensions.cs b/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchErrorExtensions.cs
index 85a43609..a211eb89 100644
--- a/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchErrorExtensions.cs
+++ b/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchErrorExtensions.cs
@@ -14,6 +14,10 @@ public static class ElasticsearchErrorExtensions
/// Try to parse an Elasticsearch
public static bool TryGetElasticsearchServerError(this StringResponse response, out ElasticsearchServerError? serverError)
{
+ // Prefer the factory-extracted error if available
+ if (TryGetFactoryExtractedError(response, out serverError))
+ return true;
+
serverError = null;
if (string.IsNullOrEmpty(response.Body) || response.ApiCallDetails.ResponseContentType != BoundConfiguration.DefaultContentType)
return false;
@@ -26,6 +30,10 @@ public static bool TryGetElasticsearchServerError(this StringResponse response,
/// Try to parse an Elasticsearch
public static bool TryGetElasticsearchServerError(this BytesResponse response, out ElasticsearchServerError? serverError)
{
+ // Prefer the factory-extracted error if available
+ if (TryGetFactoryExtractedError(response, out serverError))
+ return true;
+
serverError = null;
if (response.Body == null || response.Body.Length == 0 || response.ApiCallDetails.ResponseContentType != BoundConfiguration.DefaultContentType)
return false;
@@ -41,6 +49,10 @@ public static bool TryGetElasticsearchServerError(this BytesResponse response, o
///
public static bool TryGetElasticsearchServerError(this TransportResponse response, out ElasticsearchServerError? serverError)
{
+ // Prefer the factory-extracted error if available
+ if (TryGetFactoryExtractedError(response, out serverError))
+ return true;
+
serverError = null;
var bytes = response.ApiCallDetails.ResponseBodyInBytes;
if (bytes == null || response.ApiCallDetails.ResponseContentType != BoundConfiguration.DefaultContentType)
@@ -50,4 +62,10 @@ public static bool TryGetElasticsearchServerError(this TransportResponse respons
using var stream = settings.MemoryStreamFactory.Create(bytes);
return ElasticsearchServerError.TryCreate(stream, out serverError);
}
+
+ private static bool TryGetFactoryExtractedError(TransportResponse response, out ElasticsearchServerError? serverError)
+ {
+ serverError = response.ApiCallDetails?.ProductError as ElasticsearchServerError;
+ return serverError?.HasError() ?? false;
+ }
}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchProductRegistration.cs b/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchProductRegistration.cs
index 089a5da7..ac9281b7 100644
--- a/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchProductRegistration.cs
+++ b/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchProductRegistration.cs
@@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
+using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading;
@@ -24,8 +25,6 @@ public class ElasticsearchProductRegistration : ProductRegistration
internal const string XFoundHandlingClusterHeader = "X-Found-Handling-Cluster";
internal const string XFoundHandlingInstanceHeader = "X-Found-Handling-Instance";
- private readonly HeadersList _headers;
- private readonly MetaHeaderProvider _metaHeaderProvider;
private readonly int? _clientMajorVersion;
private static string? _clusterName;
@@ -37,8 +36,8 @@ public class ElasticsearchProductRegistration : ProductRegistration
///
internal ElasticsearchProductRegistration()
{
- _headers = new HeadersList("warning");
- _metaHeaderProvider = null!;
+ ResponseHeadersToParse = new HeadersList("warning");
+ MetaHeaderProvider = null!;
ProductAssemblyVersion = null!;
}
@@ -52,7 +51,7 @@ public ElasticsearchProductRegistration(Type markerType) : this()
var identifier = ServiceIdentifier;
if (!string.IsNullOrEmpty(identifier))
- _metaHeaderProvider = new DefaultMetaHeaderProvider(clientVersionInfo, identifier!);
+ MetaHeaderProvider = new DefaultMetaHeaderProvider(clientVersionInfo, identifier!);
// Only set this if we have a version.
// If we don't have a version we won't apply the vendor-based REST API compatibility Accept header.
@@ -71,7 +70,7 @@ public ElasticsearchProductRegistration(Type markerType) : this()
public override string Name { get; } = "elasticsearch-net";
///
- public override string? ServiceIdentifier => "es";
+ public sealed override string ServiceIdentifier => "es";
///
public override bool SupportsPing { get; } = true;
@@ -80,10 +79,10 @@ public ElasticsearchProductRegistration(Type markerType) : this()
public override bool SupportsSniff { get; } = true;
///
- public override HeadersList ResponseHeadersToParse => _headers;
+ public override HeadersList ResponseHeadersToParse { get; }
///
- public override MetaHeaderProvider MetaHeaderProvider => _metaHeaderProvider;
+ public override MetaHeaderProvider MetaHeaderProvider { get; }
///
public override string? DefaultContentType => _clientMajorVersion.HasValue ? $"application/vnd.elasticsearch+json;compatible-with={_clientMajorVersion.Value}" : null;
@@ -103,11 +102,17 @@ public override int SniffOrder(Node node) =>
/// API calls. They are considered for ping and sniff requests.
///
public override bool NodePredicate(Node node) =>
- // skip master only nodes (holds no data and is master eligible)
+ // Skip master only nodes (holds no data and is master eligible)
!(node.HasFeature(ElasticsearchNodeFeatures.MasterEligible) &&
!node.HasFeature(ElasticsearchNodeFeatures.HoldsData));
///
+ ///
+ /// We consider all status codes >= 200 and < 300 valid by default.
+ /// Elasticsearch might return 404 for valid responses in some cases (e.g. `GET /my-index/_doc/missing-doc-id`) but also for actual error cases like
+ /// missing endpoints, missing indices (e.g. `GET /missing-index/_mapping`), etc.
+ /// The 404 case is handled on a per-request basis (see for details).
+ ///
public override bool HttpStatusCodeClassifier(HttpMethod method, int statusCode) =>
statusCode is >= 200 and < 300;
@@ -115,13 +120,14 @@ public override bool HttpStatusCodeClassifier(HttpMethod method, int statusCode)
public override bool TryGetServerErrorReason(TResponse response, out string? reason)
{
reason = null;
- if (response is StringResponse s && s.TryGetElasticsearchServerError(out var e))
- reason = e?.Error?.ToString();
- else if (response is BytesResponse b && b.TryGetElasticsearchServerError(out e))
- reason = e?.Error?.ToString();
- else if (response.TryGetElasticsearchServerError(out e))
- reason = e?.Error?.ToString();
- return e != null;
+
+ if (response.ApiCallDetails?.ProductError is ElasticsearchServerError error && error.HasError())
+ {
+ reason = error.Error?.ToString();
+ return true;
+ }
+
+ return false;
}
//TODO remove settings dependency
@@ -228,5 +234,40 @@ public override IReadOnlyCollection DefaultHeadersToParse()
};
///
- public override IReadOnlyCollection ResponseBuilders { get; } = [new ElasticsearchResponseBuilder()];
+ public override IReadOnlyCollection ResponseBuilders { get; } =
+ [
+ new StringResponseBuilder(),
+ new DynamicResponseBuilder(),
+ new JsonResponseBuilder(),
+ new ElasticsearchStreamResponseBuilder(),
+#if NET10_0_OR_GREATER
+ new ElasticsearchPipeResponseBuilder(),
+#endif
+ new ElasticsearchResponseBuilder()
+ ];
+
+ ///
+ public override bool IsErrorContentType(string? contentType) =>
+ contentType is not null && (
+ contentType.StartsWith("application/json", StringComparison.OrdinalIgnoreCase) ||
+ contentType.StartsWith("application/vnd.elasticsearch+json", StringComparison.OrdinalIgnoreCase));
+
+ ///
+ public override ErrorResponse? TryExtractError(BoundConfiguration boundConfiguration, Stream responseStream)
+ {
+ try
+ {
+ var error = boundConfiguration.ConnectionSettings.RequestResponseSerializer
+ .Deserialize(responseStream);
+
+ if (error?.HasError() == true)
+ return error;
+ }
+ catch (System.Text.Json.JsonException)
+ {
+ // If the error deserialization fails, we'll let the builder try the original response type.
+ }
+
+ return null;
+ }
}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchResponse.cs b/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchResponse.cs
deleted file mode 100644
index 2d305ee4..00000000
--- a/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchResponse.cs
+++ /dev/null
@@ -1,113 +0,0 @@
-// Licensed to Elasticsearch B.V under one or more agreements.
-// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
-// See the LICENSE file in the project root for more information
-
-using System;
-using System.Collections.Generic;
-using System.Text;
-using System.Text.Json.Serialization;
-
-namespace Elastic.Transport.Products.Elasticsearch;
-
-///
-/// Base response for Elasticsearch responses.
-///
-public abstract class ElasticsearchResponse : TransportResponse
-{
- ///
- /// A collection of warnings returned from Elasticsearch.
- /// Used to provide server warnings, for example, when the request uses an API feature that is marked as deprecated.
- ///
- [JsonIgnore]
- public IEnumerable ElasticsearchWarnings
- {
- get
- {
- if (ApiCallDetails.ParsedHeaders is not null && ApiCallDetails.ParsedHeaders.TryGetValue("warning", out var warnings))
- {
- foreach (var warning in warnings)
- yield return warning;
- }
- }
- }
-
- ///
- ///
- ///
- [JsonIgnore]
- public string DebugInformation
- {
- get
- {
- var sb = new StringBuilder();
- _ = sb.Append($"{(!IsValidResponse ? "Inv" : "V")}alid Elasticsearch response built from a ");
- _ = sb.AppendLine(ApiCallDetails?.ToString().ToCamelCase() ??
- "null ApiCall which is highly exceptional, please open a bug if you see this");
- if (!IsValidResponse)
- DebugIsValid(sb);
-
- if (ApiCallDetails?.ParsedHeaders is not null && ApiCallDetails.ParsedHeaders.TryGetValue("warning", out var warnings))
- {
- _ = sb.AppendLine($"# Server indicated warnings:");
-
- foreach (var warning in warnings)
- _ = sb.AppendLine($"- {warning}");
- }
-
- if (ApiCallDetails != null)
- _ = Diagnostics.ResponseStatics.DebugInformationBuilder(ApiCallDetails, sb);
-
- return sb.ToString();
- }
- }
-
- ///
- /// Shortcut to test if the response is considered successful.
- ///
- /// A indicating success or failure.
- [JsonIgnore]
- public virtual bool IsValidResponse
- {
- get
- {
- var statusCode = ApiCallDetails?.HttpStatusCode;
-
- if (statusCode == 404)
- return false;
-
- return (ApiCallDetails?.HasSuccessfulStatusCodeAndExpectedContentType ?? false) && (!ElasticsearchServerError?.HasError() ?? true);
- }
- }
-
- ///
- ///
- ///
- [JsonIgnore]
- public ElasticsearchServerError? ElasticsearchServerError { get; internal set; }
-
- ///
- ///
- ///
- ///
- ///
- // TODO: We need nullable annotations here ideally as exception is not null when the return value is true.
- public bool TryGetOriginalException(out Exception? exception)
- {
- if (ApiCallDetails?.OriginalException is not null)
- {
- exception = ApiCallDetails.OriginalException;
- return true;
- }
-
- exception = null;
- return false;
- }
-
- /// Subclasses can override this to provide more information on why a call is not valid.
- protected virtual void DebugIsValid(StringBuilder sb) { }
-
- ///
- /// A custom implementation that returns
- ///
- public override string ToString() => DebugInformation;
-}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchResponseBuilder.cs b/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchResponseBuilder.cs
deleted file mode 100644
index 6a1d2e4b..00000000
--- a/src/Elastic.Transport/Products/Elasticsearch/ElasticsearchResponseBuilder.cs
+++ /dev/null
@@ -1,110 +0,0 @@
-// Licensed to Elasticsearch B.V under one or more agreements.
-// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
-// See the LICENSE file in the project root for more information
-
-using System;
-using System.Diagnostics;
-using System.IO;
-using System.Linq;
-using System.Text.Json;
-using System.Threading;
-using System.Threading.Tasks;
-using Elastic.Transport.Diagnostics;
-
-namespace Elastic.Transport.Products.Elasticsearch;
-
-internal sealed class ElasticsearchResponseBuilder : IResponseBuilder
-{
- bool IResponseBuilder.CanBuild() => true;
-
- public TResponse? Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration,
- Stream responseStream, string contentType, long contentLength)
- where TResponse : TransportResponse, new() =>
- SetBodyCoreAsync(false, apiCallDetails, boundConfiguration, responseStream).EnsureCompleted();
-
- public Task BuildAsync(
- ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength,
- CancellationToken cancellationToken) where TResponse : TransportResponse, new() =>
- SetBodyCoreAsync(true, apiCallDetails, boundConfiguration, responseStream, cancellationToken).AsTask();
-
- private static async ValueTask SetBodyCoreAsync(bool isAsync,
- ApiCallDetails details, BoundConfiguration boundConfiguration, Stream responseStream,
- CancellationToken cancellationToken = default)
- where TResponse : TransportResponse, new()
- {
- TResponse? response = null;
-
- if (details.HttpStatusCode.HasValue &&
- boundConfiguration.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
- {
- return response;
- }
-
- try
- {
- if (details.HttpStatusCode > 399)
- {
- var ownsStream = false;
-
- if (!responseStream.CanSeek)
- {
- var inMemoryStream = boundConfiguration.MemoryStreamFactory.Create();
- await responseStream.CopyToAsync(inMemoryStream, BufferedResponseHelpers.BufferSize, cancellationToken).ConfigureAwait(false);
- details.ResponseBodyInBytes = BufferedResponseHelpers.SwapStreams(ref responseStream, ref inMemoryStream);
- ownsStream = true;
- }
-
- if (TryGetError(boundConfiguration, responseStream, out var error) && error?.HasError() == true)
- {
- response = new TResponse();
-
- if (response is ElasticsearchResponse elasticResponse)
- elasticResponse.ElasticsearchServerError = error;
-
- if (ownsStream)
- responseStream.Dispose();
-
- return response;
- }
-
- responseStream.Position = 0;
- }
-
- var beforeTicks = Stopwatch.GetTimestamp();
-
- response = isAsync
- ? await boundConfiguration.ConnectionSettings.RequestResponseSerializer.DeserializeAsync(responseStream, cancellationToken).ConfigureAwait(false)
- : boundConfiguration.ConnectionSettings.RequestResponseSerializer.Deserialize(responseStream);
-
- var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
-
- if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
- _ = (Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs));
-
- return response;
- }
- catch (JsonException ex) when (ex.Message.Contains("The input does not contain any JSON tokens"))
- {
- return response;
- }
- }
-
- private static bool TryGetError(BoundConfiguration boundConfiguration, Stream responseStream, out ElasticsearchServerError? error)
- {
- Debug.Assert(responseStream.CanSeek);
-
- error = null;
-
- try
- {
- error = boundConfiguration.ConnectionSettings.RequestResponseSerializer.Deserialize(responseStream);
- return error is not null;
- }
- catch (JsonException)
- {
- // Empty catch as we'll try the original response type if the error serialization fails
- }
-
- return false;
- }
-}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchDynamicResponse.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchDynamicResponse.cs
new file mode 100644
index 00000000..8e014536
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchDynamicResponse.cs
@@ -0,0 +1,41 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System;
+using System.Collections.Generic;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// A response that exposes the body as with Elasticsearch error handling.
+/// Provides , ,
+/// and in addition to dynamic JSON traversal.
+///
+public sealed class ElasticsearchDynamicResponse : DynamicResponseBase, IElasticsearchResponse
+{
+ ///
+ public ElasticsearchDynamicResponse() { }
+
+ ///
+ public ElasticsearchDynamicResponse(DynamicDictionary dictionary) : base(dictionary) { }
+
+ ///
+ public ElasticsearchServerError? ElasticsearchServerError => ElasticsearchResponseHelper.GetElasticsearchError(ApiCallDetails);
+
+ ///
+ public bool IsValidResponse => ElasticsearchResponseHelper.IsValidResponse(ApiCallDetails);
+
+ ///
+ public IEnumerable ElasticsearchWarnings => ElasticsearchResponseHelper.GetElasticsearchWarnings(ApiCallDetails);
+
+ ///
+ public string DebugInformation => ElasticsearchResponseHelper.GetDebugInformation(IsValidResponse, ApiCallDetails);
+
+ ///
+ public bool TryGetOriginalException(out Exception? exception) =>
+ ElasticsearchResponseHelper.TryGetOriginalException(ApiCallDetails, out exception);
+
+ ///
+ public override string ToString() => DebugInformation;
+}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchJsonResponse.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchJsonResponse.cs
new file mode 100644
index 00000000..13e42904
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchJsonResponse.cs
@@ -0,0 +1,42 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System;
+using System.Collections.Generic;
+using System.Text.Json.Nodes;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// A response backed by with Elasticsearch error handling.
+/// Provides , ,
+/// and in addition to JSON DOM access.
+///
+public sealed class ElasticsearchJsonResponse : JsonResponseBase, IElasticsearchResponse
+{
+ ///
+ public ElasticsearchJsonResponse() { }
+
+ ///
+ public ElasticsearchJsonResponse(JsonNode node) : base(node) { }
+
+ ///
+ public ElasticsearchServerError? ElasticsearchServerError => ElasticsearchResponseHelper.GetElasticsearchError(ApiCallDetails);
+
+ ///
+ public bool IsValidResponse => ElasticsearchResponseHelper.IsValidResponse(ApiCallDetails);
+
+ ///
+ public IEnumerable ElasticsearchWarnings => ElasticsearchResponseHelper.GetElasticsearchWarnings(ApiCallDetails);
+
+ ///
+ public string DebugInformation => ElasticsearchResponseHelper.GetDebugInformation(IsValidResponse, ApiCallDetails);
+
+ ///
+ public bool TryGetOriginalException(out Exception? exception) =>
+ ElasticsearchResponseHelper.TryGetOriginalException(ApiCallDetails, out exception);
+
+ ///
+ public override string ToString() => DebugInformation;
+}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchPipeResponse.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchPipeResponse.cs
new file mode 100644
index 00000000..e815e9c5
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchPipeResponse.cs
@@ -0,0 +1,53 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+#if NET10_0_OR_GREATER
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.IO.Pipelines;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// A response that exposes the response as a with Elasticsearch error handling.
+///
+/// MUST be disposed after use to ensure the HTTP connection is freed for reuse.
+///
+/// Provides , ,
+/// and in addition to the pipe reader body.
+///
+public sealed class ElasticsearchPipeResponse : PipeResponseBase, IElasticsearchResponse
+{
+ ///
+ public ElasticsearchPipeResponse() : this(Stream.Null, string.Empty) { }
+
+ ///
+ public ElasticsearchPipeResponse(Stream responseStream, string? contentType) : base(responseStream, contentType) { }
+
+ ///
+ /// The response body as a .
+ ///
+ public PipeReader Body => Pipe;
+
+ ///
+ public ElasticsearchServerError? ElasticsearchServerError => ElasticsearchResponseHelper.GetElasticsearchError(ApiCallDetails);
+
+ ///
+ public bool IsValidResponse => ElasticsearchResponseHelper.IsValidResponse(ApiCallDetails);
+
+ ///
+ public IEnumerable ElasticsearchWarnings => ElasticsearchResponseHelper.GetElasticsearchWarnings(ApiCallDetails);
+
+ ///
+ public string DebugInformation => ElasticsearchResponseHelper.GetDebugInformation(IsValidResponse, ApiCallDetails);
+
+ ///
+ public bool TryGetOriginalException(out Exception? exception) =>
+ ElasticsearchResponseHelper.TryGetOriginalException(ApiCallDetails, out exception);
+
+ ///
+ public override string ToString() => DebugInformation;
+}
+#endif
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchResponse.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchResponse.cs
new file mode 100644
index 00000000..a777f6f3
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchResponse.cs
@@ -0,0 +1,48 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Text.Json.Serialization;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// Base response for Elasticsearch responses.
+///
+public abstract class ElasticsearchResponse : TransportResponse, IElasticsearchResponse
+{
+ ///
+ [JsonIgnore]
+ public IEnumerable ElasticsearchWarnings =>
+ ElasticsearchResponseHelper.GetElasticsearchWarnings(ApiCallDetails);
+
+ ///
+ [JsonIgnore]
+ public string DebugInformation =>
+ ElasticsearchResponseHelper.GetDebugInformation(IsValidResponse, ApiCallDetails);
+
+ ///
+ [JsonIgnore]
+ public virtual bool IsValidResponse =>
+ ElasticsearchResponseHelper.IsValidResponse(ApiCallDetails);
+
+ ///
+ [JsonIgnore]
+ public ElasticsearchServerError? ElasticsearchServerError =>
+ ElasticsearchResponseHelper.GetElasticsearchError(ApiCallDetails);
+
+ ///
+ public bool TryGetOriginalException(out Exception? exception) =>
+ ElasticsearchResponseHelper.TryGetOriginalException(ApiCallDetails, out exception);
+
+ /// Subclasses can override this to provide more information on why a call is not valid.
+ protected virtual void DebugIsValid(StringBuilder sb) { }
+
+ ///
+ /// A custom implementation that returns
+ ///
+ public override string ToString() => DebugInformation;
+}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchResponseBuilder.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchResponseBuilder.cs
new file mode 100644
index 00000000..ce950b01
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchResponseBuilder.cs
@@ -0,0 +1,65 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Elastic.Transport.Diagnostics;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// Catch-all builder for subclasses.
+/// Handles JSON deserialization for strongly-typed response types.
+///
+/// Error extraction is handled by the via
+/// before this builder is invoked.
+///
+///
+internal sealed class ElasticsearchResponseBuilder : IResponseBuilder
+{
+ bool IResponseBuilder.CanBuild() => true;
+
+ public TResponse? Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration,
+ Stream responseStream, string contentType, long contentLength)
+ where TResponse : TransportResponse, new() =>
+ SetBodyCoreAsync(false, apiCallDetails, boundConfiguration, responseStream, cancellationToken: default).EnsureCompleted();
+
+ public Task BuildAsync(
+ ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength,
+ CancellationToken cancellationToken) where TResponse : TransportResponse, new() =>
+ SetBodyCoreAsync(true, apiCallDetails, boundConfiguration, responseStream, cancellationToken).AsTask();
+
+ private static async ValueTask SetBodyCoreAsync(bool isAsync,
+ ApiCallDetails details, BoundConfiguration boundConfiguration, Stream responseStream,
+ CancellationToken cancellationToken)
+ where TResponse : TransportResponse, new()
+ {
+ TResponse? response = null;
+
+ if (details.HttpStatusCode.HasValue &&
+ boundConfiguration.SkipDeserializationForStatusCodes.Contains(details.HttpStatusCode.Value))
+ return response;
+
+ // If the factory already extracted a product error, skip body deserialization
+ // (the error is already available via ApiCallDetails.ProductError).
+ if (details.ProductError?.HasError() == true)
+ return response;
+
+ var beforeTicks = Stopwatch.GetTimestamp();
+
+ response = isAsync
+ ? await boundConfiguration.ConnectionSettings.RequestResponseSerializer.DeserializeAsync(responseStream, cancellationToken).ConfigureAwait(false)
+ : boundConfiguration.ConnectionSettings.RequestResponseSerializer.Deserialize(responseStream);
+
+ var deserializeResponseMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
+
+ if (deserializeResponseMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
+ _ = Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportDeserializeResponseMs, deserializeResponseMs);
+
+ return response;
+ }
+}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchResponseHelper.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchResponseHelper.cs
new file mode 100644
index 00000000..3f14b98f
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchResponseHelper.cs
@@ -0,0 +1,83 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Elastic.Transport.Diagnostics;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// Shared implementation logic for properties.
+/// Used by all concrete Elasticsearch response types to avoid duplication.
+///
+internal static class ElasticsearchResponseHelper
+{
+ public static bool IsValidResponse(ApiCallDetails? apiCallDetails)
+ {
+ if (apiCallDetails is null || !apiCallDetails.HasExpectedContentType)
+ return false;
+
+ // Elasticsearch returns 404 for valid responses in some cases (e.g. `GET /my-index/_doc/missing-doc-id`) but also for actual error cases like
+ // missing endpoints, missing indices (e.g. `GET /missing-index/_mapping`), etc.
+ // We consider all status codes >= 200 and < 300 valid by default. For 404, we assume "invalid" and try to parse the Elasticsearch
+ // error response from the body.
+ // A 404 status code without an error body indicates a valid response.
+
+ var serverError = GetElasticsearchError(apiCallDetails);
+ if (apiCallDetails.HttpStatusCode is 404)
+ return !serverError?.HasError() ?? true;
+
+ return apiCallDetails.HasSuccessfulStatusCode;
+ }
+
+ public static ElasticsearchServerError? GetElasticsearchError(ApiCallDetails? apiCallDetails) =>
+ apiCallDetails?.ProductError as ElasticsearchServerError;
+
+ public static IEnumerable GetElasticsearchWarnings(ApiCallDetails? apiCallDetails)
+ {
+ if (apiCallDetails?.ParsedHeaders is null || !apiCallDetails.ParsedHeaders.TryGetValue("warning", out var warnings))
+ yield break;
+
+ foreach (var warning in warnings)
+ yield return warning;
+ }
+
+ public static string GetDebugInformation(bool isValidResponse, ApiCallDetails? apiCallDetails)
+ {
+ var serverError = GetElasticsearchError(apiCallDetails);
+ var sb = new StringBuilder();
+ _ = sb.Append($"{(!isValidResponse ? "Inv" : "V")}alid Elasticsearch response built from a ");
+ _ = sb.AppendLine(apiCallDetails?.ToString().ToCamelCase() ??
+ "null ApiCall which is highly exceptional, please open a bug if you see this");
+ if (!isValidResponse && serverError?.HasError() == true)
+ _ = sb.AppendLine($"# ServerError: {serverError}");
+
+ if (apiCallDetails?.ParsedHeaders is not null && apiCallDetails.ParsedHeaders.TryGetValue("warning", out var warnings))
+ {
+ _ = sb.AppendLine("# Server indicated warnings:");
+
+ foreach (var warning in warnings)
+ _ = sb.AppendLine($"- {warning}");
+ }
+
+ if (apiCallDetails != null)
+ _ = ResponseStatics.DebugInformationBuilder(apiCallDetails, sb);
+
+ return sb.ToString();
+ }
+
+ public static bool TryGetOriginalException(ApiCallDetails? apiCallDetails, out Exception? exception)
+ {
+ if (apiCallDetails?.OriginalException is not null)
+ {
+ exception = apiCallDetails.OriginalException;
+ return true;
+ }
+
+ exception = null;
+ return false;
+ }
+}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchSpecialResponseBuilders.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchSpecialResponseBuilders.cs
new file mode 100644
index 00000000..379cf736
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchSpecialResponseBuilders.cs
@@ -0,0 +1,39 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System.IO;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// Builds from a raw response stream.
+///
+internal sealed class ElasticsearchStreamResponseBuilder : TypedResponseBuilder
+{
+ protected override ElasticsearchStreamResponse Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration,
+ Stream responseStream, string contentType, long contentLength) =>
+ new(responseStream, contentType);
+
+ protected override Task BuildAsync(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration,
+ Stream responseStream, string contentType, long contentLength, CancellationToken cancellationToken = default) =>
+ Task.FromResult(new ElasticsearchStreamResponse(responseStream, contentType));
+}
+
+#if NET10_0_OR_GREATER
+///
+/// Builds from a raw response stream.
+///
+internal sealed class ElasticsearchPipeResponseBuilder : TypedResponseBuilder
+{
+ protected override ElasticsearchPipeResponse Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration,
+ Stream responseStream, string contentType, long contentLength) =>
+ new(responseStream, contentType);
+
+ protected override Task BuildAsync(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration,
+ Stream responseStream, string contentType, long contentLength, CancellationToken cancellationToken = default) =>
+ Task.FromResult(new ElasticsearchPipeResponse(responseStream, contentType));
+}
+#endif
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchStreamResponse.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchStreamResponse.cs
new file mode 100644
index 00000000..71a11a19
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchStreamResponse.cs
@@ -0,0 +1,50 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// A response that exposes the response as a with Elasticsearch error handling.
+///
+/// MUST be disposed after use to ensure the HTTP connection is freed for reuse.
+///
+/// Provides , ,
+/// and in addition to the stream body.
+///
+public sealed class ElasticsearchStreamResponse : StreamResponseBase, IElasticsearchResponse
+{
+ ///
+ public ElasticsearchStreamResponse() : base(Stream.Null) { }
+
+ ///
+ public ElasticsearchStreamResponse(Stream body, string? contentType) : base(body, contentType) { }
+
+ ///
+ /// The raw response stream.
+ ///
+ public Stream Body => Stream;
+
+ ///
+ public ElasticsearchServerError? ElasticsearchServerError => ElasticsearchResponseHelper.GetElasticsearchError(ApiCallDetails);
+
+ ///
+ public bool IsValidResponse => ElasticsearchResponseHelper.IsValidResponse(ApiCallDetails);
+
+ ///
+ public IEnumerable ElasticsearchWarnings => ElasticsearchResponseHelper.GetElasticsearchWarnings(ApiCallDetails);
+
+ ///
+ public string DebugInformation => ElasticsearchResponseHelper.GetDebugInformation(IsValidResponse, ApiCallDetails);
+
+ ///
+ public bool TryGetOriginalException(out Exception? exception) =>
+ ElasticsearchResponseHelper.TryGetOriginalException(ApiCallDetails, out exception);
+
+ ///
+ public override string ToString() => DebugInformation;
+}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchStringResponse.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchStringResponse.cs
new file mode 100644
index 00000000..6a6f79ae
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/ElasticsearchStringResponse.cs
@@ -0,0 +1,41 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System;
+using System.Collections.Generic;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// A response that exposes the response body as a with Elasticsearch error handling.
+/// Provides , ,
+/// and in addition to the string body.
+///
+public sealed class ElasticsearchStringResponse : StringResponseBase, IElasticsearchResponse
+{
+ ///
+ public ElasticsearchStringResponse() { }
+
+ ///
+ public ElasticsearchStringResponse(string body) : base(body) { }
+
+ ///
+ public ElasticsearchServerError? ElasticsearchServerError => ElasticsearchResponseHelper.GetElasticsearchError(ApiCallDetails);
+
+ ///
+ public bool IsValidResponse => ElasticsearchResponseHelper.IsValidResponse(ApiCallDetails);
+
+ ///
+ public IEnumerable ElasticsearchWarnings => ElasticsearchResponseHelper.GetElasticsearchWarnings(ApiCallDetails);
+
+ ///
+ public string DebugInformation => ElasticsearchResponseHelper.GetDebugInformation(IsValidResponse, ApiCallDetails);
+
+ ///
+ public bool TryGetOriginalException(out Exception? exception) =>
+ ElasticsearchResponseHelper.TryGetOriginalException(ApiCallDetails, out exception);
+
+ ///
+ public override string ToString() => DebugInformation;
+}
diff --git a/src/Elastic.Transport/Products/Elasticsearch/Responses/IElasticsearchResponse.cs b/src/Elastic.Transport/Products/Elasticsearch/Responses/IElasticsearchResponse.cs
new file mode 100644
index 00000000..69ce5f8a
--- /dev/null
+++ b/src/Elastic.Transport/Products/Elasticsearch/Responses/IElasticsearchResponse.cs
@@ -0,0 +1,43 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System;
+using System.Collections.Generic;
+
+namespace Elastic.Transport.Products.Elasticsearch;
+
+///
+/// Common interface for Elasticsearch responses, providing unified access to error handling,
+/// warnings, and response validation across all response types.
+///
+public interface IElasticsearchResponse
+{
+ ///
+ /// Whether the response is considered successful.
+ ///
+ bool IsValidResponse { get; }
+
+ ///
+ /// A collection of warnings returned from Elasticsearch.
+ /// Used to provide server warnings, for example, when the request uses an API feature that is marked as deprecated.
+ ///
+ IEnumerable ElasticsearchWarnings { get; }
+
+ ///
+ /// The server error, if any, returned by Elasticsearch.
+ ///
+ ElasticsearchServerError? ElasticsearchServerError { get; }
+
+ ///
+ /// Debug information about the request and response.
+ ///
+ string DebugInformation { get; }
+
+ ///
+ /// Attempts to retrieve the original exception that occurred during the request.
+ ///
+ /// The original exception, if one occurred.
+ /// true if an original exception was found; otherwise, false.
+ bool TryGetOriginalException(out Exception? exception);
+}
diff --git a/src/Elastic.Transport/Products/ProductRegistration.cs b/src/Elastic.Transport/Products/ProductRegistration.cs
index 9c161d51..849881fc 100644
--- a/src/Elastic.Transport/Products/ProductRegistration.cs
+++ b/src/Elastic.Transport/Products/ProductRegistration.cs
@@ -4,6 +4,7 @@
using System;
using System.Collections.Generic;
+using System.IO;
using System.Threading;
using System.Threading.Tasks;
@@ -138,7 +139,27 @@ public abstract class ProductRegistration
public abstract Dictionary? ParseOpenTelemetryAttributesFromApiCallDetails(ApiCallDetails callDetails);
///
- ///
+ ///
///
public virtual IReadOnlyCollection ResponseBuilders { get; } = [new DefaultResponseBuilder()];
+
+ ///
+ /// Determines whether the given response content-type indicates a product-specific error body
+ /// that can be deserialized by .
+ /// This is checked before attempting error extraction to avoid parsing non-error content
+ /// (e.g., HTML from a reverse proxy, or binary responses from non-JSON endpoints).
+ ///
+ /// The value of the Content-Type header from the response.
+ /// true if the content-type represents a parseable error response.
+ public virtual bool IsErrorContentType(string? contentType) => false;
+
+ ///
+ /// Attempts to extract a product-specific error from the response stream for error status codes.
+ /// Called by the response factory when the HTTP status code indicates an error (> 399).
+ /// The factory guarantees the stream is seekable and resets its position after this call.
+ ///
+ /// The bound configuration for the request.
+ /// A seekable stream containing the response body.
+ /// An if one was successfully extracted; otherwise null.
+ public virtual ErrorResponse? TryExtractError(BoundConfiguration boundConfiguration, Stream responseStream) => null;
}
diff --git a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs
index b7beb8b5..b759b4de 100644
--- a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs
+++ b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs
@@ -9,6 +9,7 @@
using System.Net.NetworkInformation;
using System.Threading;
using System.Threading.Tasks;
+
using Elastic.Transport.Diagnostics;
using Elastic.Transport.Extensions;
@@ -43,7 +44,7 @@ public override TResponse Create(
Exception? ex,
int? statusCode,
Dictionary>? headers,
- Stream responseStream,
+ Stream? responseStream,
string? contentType,
long contentLength,
IReadOnlyDictionary? threadPoolStats,
@@ -59,7 +60,7 @@ public override Task CreateAsync(
Exception? ex,
int? statusCode,
Dictionary>? headers,
- Stream responseStream,
+ Stream? responseStream,
string? contentType,
long contentLength,
IReadOnlyDictionary? threadPoolStats,
@@ -76,56 +77,120 @@ private async ValueTask CreateCoreAsync(
Exception? ex,
int? statusCode,
Dictionary>? headers,
- Stream responseStream,
+ Stream? responseStream,
string? contentType,
long contentLength,
IReadOnlyDictionary? threadPoolStats,
IReadOnlyDictionary? tcpStats,
CancellationToken cancellationToken = default) where TResponse : TransportResponse, new()
{
+ TResponse? response = null;
var details = InitializeApiCallDetails(endpoint, boundConfiguration, postData, ex, statusCode, headers, contentType, threadPoolStats, tcpStats, contentLength);
- TResponse? response = null;
+ if (!MayHaveBody(statusCode, endpoint.Method, contentLength) || responseStream is null)
+ return FinalizeResponse();
- if (responseStream is not null && MayHaveBody(statusCode, endpoint.Method, contentLength)
- && TryResolveBuilder(boundConfiguration.ResponseBuilders, boundConfiguration.ProductResponseBuilders, out var builder))
- {
- var ownsStream = false;
+ var productRegistration = boundConfiguration.ConnectionSettings.ProductRegistration;
- // We always pre-buffer when there may be a body, even if the content type does not match.
- // That way, we ensure the caller can access the bytes themselves for "invalid" responses.
- if (boundConfiguration.DisableDirectStreaming)
- {
- var inMemoryStream = boundConfiguration.MemoryStreamFactory.Create();
+ var mayHaveErrorBody = statusCode.HasValue &&
+ !productRegistration.HttpStatusCodeClassifier(endpoint.Method, statusCode.Value);
+
+ var ownsStream = false;
+
+ TryResolveBuilder(boundConfiguration.ResponseBuilders, boundConfiguration.ProductResponseBuilders, out var builder);
- if (isAsync)
- await responseStream.CopyToAsync(inMemoryStream, BufferedResponseHelpers.BufferSize, cancellationToken).ConfigureAwait(false);
- else
- responseStream.CopyTo(inMemoryStream, BufferedResponseHelpers.BufferSize);
+ // We always pre-buffer when there may be a body, even if the content type does not match.
+ // That way, we ensure the caller can access the bytes themselves for "invalid" responses.
+ if (boundConfiguration.DisableDirectStreaming)
+ {
+ responseStream = await BufferResponseStreamAsync(isAsync, boundConfiguration, details, responseStream, cancellationToken).ConfigureAwait(false);
+ ownsStream = true;
+ }
- details.ResponseBodyInBytes = BufferedResponseHelpers.SwapStreams(ref responseStream, ref inMemoryStream);
+ // For non-success status codes, always buffer the response body so that callers
+ // can inspect it via ApiCallDetails.ResponseBodyInBytes — even when the content-type
+ // doesn't match the product's error format (e.g., HTML from a reverse proxy).
+ if (mayHaveErrorBody)
+ {
+ if (!responseStream.CanSeek)
+ {
+ responseStream = await BufferResponseStreamAsync(isAsync, boundConfiguration, details, responseStream, cancellationToken).ConfigureAwait(false);
ownsStream = true;
}
+ else if (details.ResponseBodyInBytes is null)
+ await CaptureResponseBytesAsync(isAsync, boundConfiguration, details, responseStream, cancellationToken).ConfigureAwait(false);
- // We only attempt to build a response when the Content-Type matches the accepted type.
- if (ValidateResponseContentType(boundConfiguration.Accept, contentType) && contentType is not null)
+ // Product-specific error extraction when the content-type matches.
+ if (productRegistration.IsErrorContentType(contentType))
{
- response = isAsync
- ? await builder.BuildAsync(details, boundConfiguration, responseStream, contentType, contentLength, cancellationToken).ConfigureAwait(false)
- : builder.Build(details, boundConfiguration, responseStream, contentType, contentLength);
+ details.ProductError = productRegistration.TryExtractError(boundConfiguration, responseStream);
+ responseStream.Position = 0;
}
+ }
+
+ // We only attempt to build a response when the Content-Type matches the accepted type.
+ if (builder is not null && ValidateResponseContentType(boundConfiguration.Accept, contentType) && contentType is not null)
+ {
+ response = isAsync
+ ? await builder.BuildAsync(details, boundConfiguration, responseStream, contentType, contentLength, cancellationToken).ConfigureAwait(false)
+ : builder.Build(details, boundConfiguration, responseStream, contentType, contentLength);
+ }
+
+ if (ownsStream && (response is null || !response.LeaveOpen))
+ responseStream.Dispose();
- if (ownsStream && (response is null || !response.LeaveOpen))
- responseStream?.Dispose();
+ return FinalizeResponse();
+
+ TResponse FinalizeResponse()
+ {
+ response ??= new TResponse();
+ response.ApiCallDetails = details;
+ return response;
}
+ }
+
+ ///
+ /// Buffers the response stream into a seekable in-memory stream and records the bytes in
+ /// . Returns the new in-memory stream.
+ ///
+ private static async ValueTask BufferResponseStreamAsync(bool isAsync,
+ BoundConfiguration boundConfiguration, ApiCallDetails details,
+ Stream responseStream, CancellationToken cancellationToken)
+ {
+ var inMemoryStream = boundConfiguration.MemoryStreamFactory.Create();
- response ??= new TResponse();
- response.ApiCallDetails = details;
- return response;
+ if (isAsync)
+ await responseStream.CopyToAsync(inMemoryStream, BufferedResponseHelpers.BufferSize, cancellationToken).ConfigureAwait(false);
+ else
+ responseStream.CopyTo(inMemoryStream, BufferedResponseHelpers.BufferSize);
+
+ details.ResponseBodyInBytes = BufferedResponseHelpers.SwapStreams(ref responseStream, ref inMemoryStream);
+ return responseStream;
+ }
+
+ ///
+ /// Captures the response bytes for diagnostics from a seekable stream without replacing it.
+ /// The stream position is restored after reading.
+ ///
+ private static async ValueTask CaptureResponseBytesAsync(bool isAsync,
+ BoundConfiguration boundConfiguration, ApiCallDetails details,
+ Stream responseStream, CancellationToken cancellationToken)
+ {
+ var position = responseStream.Position;
+ var inMemoryStream = boundConfiguration.MemoryStreamFactory.Create();
+
+ if (isAsync)
+ await responseStream.CopyToAsync(inMemoryStream, BufferedResponseHelpers.BufferSize, cancellationToken).ConfigureAwait(false);
+ else
+ responseStream.CopyTo(inMemoryStream, BufferedResponseHelpers.BufferSize);
+
+ details.ResponseBodyInBytes = inMemoryStream.ToArray();
+ responseStream.Position = position;
+ inMemoryStream.Dispose();
}
private bool TryResolveBuilder(IReadOnlyCollection responseBuilders,
- IReadOnlyCollection productResponseBuilders, out IResponseBuilder builder
+ IReadOnlyCollection productResponseBuilders, out IResponseBuilder? builder
) where TResponse : TransportResponse, new()
{
var type = typeof(TResponse);
@@ -136,22 +201,22 @@ private bool TryResolveBuilder(IReadOnlyCollection
return true;
}
- builder = default!;
+ builder = null;
if (TryFindResponseBuilder(type, responseBuilders, _resolvedBuilders, ref builder))
return true;
return TryFindResponseBuilder(type, productResponseBuilders, _resolvedBuilders, ref builder);
- static bool TryFindResponseBuilder(Type type, IEnumerable responseBuilders, ConcurrentDictionary resolvedBuilders, ref IResponseBuilder builder)
+ static bool TryFindResponseBuilder(Type type, IEnumerable responseBuilders, ConcurrentDictionary resolvedBuilders, ref IResponseBuilder? builder)
{
foreach (var potentialBuilder in responseBuilders)
{
- if (potentialBuilder.CanBuild())
- {
- _ = resolvedBuilders.TryAdd(type, potentialBuilder);
- builder = potentialBuilder;
- return true;
- }
+ if (!potentialBuilder.CanBuild())
+ continue;
+
+ _ = resolvedBuilders.TryAdd(type, potentialBuilder);
+ builder = potentialBuilder;
+ return true;
}
return false;
diff --git a/src/Elastic.Transport/Responses/Dynamic/DynamicResponse.cs b/src/Elastic.Transport/Responses/Dynamic/DynamicResponse.cs
index a021c4fe..17993687 100644
--- a/src/Elastic.Transport/Responses/Dynamic/DynamicResponse.cs
+++ b/src/Elastic.Transport/Responses/Dynamic/DynamicResponse.cs
@@ -2,41 +2,22 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
-using System.Diagnostics.CodeAnalysis;
-using System.Dynamic;
-
namespace Elastic.Transport;
///
/// A type of response that makes it easier to work with responses in an untyped fashion.
///
-/// It exposes the body as which is `dynamic` through
+/// It exposes the body as which is `dynamic` through
///
/// Since `dynamic` can be scary in .NET this response also exposes a safe traversal mechanism under
-/// which support an xpath'esque syntax to fish for values in the returned json.
+/// which support an xpath'esque syntax to fish for values in the returned json.
///
///
-public sealed class DynamicResponse : TransportResponse
+public sealed class DynamicResponse : DynamicResponseBase
{
///
- public DynamicResponse() => Dictionary = DynamicDictionary.Empty;
+ public DynamicResponse() { }
///
- public DynamicResponse(DynamicDictionary dictionary)
- {
- Body = dictionary;
- Dictionary = dictionary;
- }
-
- private DynamicDictionary Dictionary { get; }
-
- ///
- /// Traverses data using path notation.
- /// e.g some.deep.nested.json.path
- /// A special lookup is available for ANY key using _arbitrary_key_ e.g some.deep._arbitrary_key_.json.path which will traverse into the first key
- ///
- /// path into the stored object, keys are separated with a dot and the last key is returned as T
- ///
- /// T or default
- public T Get<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] T>(string path) => Dictionary.Get(path);
+ public DynamicResponse(DynamicDictionary dictionary) : base(dictionary) { }
}
diff --git a/src/Elastic.Transport/Responses/Dynamic/DynamicResponseBase.cs b/src/Elastic.Transport/Responses/Dynamic/DynamicResponseBase.cs
new file mode 100644
index 00000000..02626bb4
--- /dev/null
+++ b/src/Elastic.Transport/Responses/Dynamic/DynamicResponseBase.cs
@@ -0,0 +1,33 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System.Diagnostics.CodeAnalysis;
+using System.Dynamic;
+
+namespace Elastic.Transport;
+
+///
+/// Base class for responses that expose the body as .
+///
+/// It exposes the body as which is dynamic through .
+/// Also provides safe traversal via using xpath-style dot-notation paths.
+///
+public abstract class DynamicResponseBase : TransportResponse
+{
+ ///
+ protected DynamicResponseBase() => Body = DynamicDictionary.Empty;
+
+ ///
+ protected DynamicResponseBase(DynamicDictionary dictionary) => Body = dictionary;
+
+ ///
+ /// Traverses data using path notation.
+ /// e.g some.deep.nested.json.path
+ /// A special lookup is available for ANY key using _arbitrary_key_ e.g some.deep._arbitrary_key_.json.path which will traverse into the first key
+ ///
+ /// path into the stored object, keys are separated with a dot and the last key is returned as T
+ ///
+ /// T or default
+ public T Get<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] T>(string path) => Body.Get(path);
+}
diff --git a/src/Elastic.Transport/Responses/Dynamic/DynamicResponseBuilder.cs b/src/Elastic.Transport/Responses/Dynamic/DynamicResponseBuilder.cs
index 0e25f58d..cf94e1ac 100644
--- a/src/Elastic.Transport/Responses/Dynamic/DynamicResponseBuilder.cs
+++ b/src/Elastic.Transport/Responses/Dynamic/DynamicResponseBuilder.cs
@@ -13,19 +13,17 @@
namespace Elastic.Transport;
-internal class DynamicResponseBuilder : TypedResponseBuilder
+internal class DynamicResponseBuilder : TypedResponseBuilder where T : DynamicResponseBase, new()
{
- protected override DynamicResponse Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength) =>
+ protected override T Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength) =>
BuildCoreAsync(false, apiCallDetails, boundConfiguration, responseStream, contentType, contentLength).EnsureCompleted();
- protected override Task BuildAsync(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength, CancellationToken cancellationToken = default) =>
+ protected override Task BuildAsync(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength, CancellationToken cancellationToken = default) =>
BuildCoreAsync(true, apiCallDetails, boundConfiguration, responseStream, contentType, contentLength, cancellationToken).AsTask();
- private static async ValueTask BuildCoreAsync(bool isAsync, ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream,
+ private static async ValueTask BuildCoreAsync(bool isAsync, ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream,
string contentType, long contentLength, CancellationToken cancellationToken = default)
{
- DynamicResponse response;
-
//if not json store the result under "body"
if (contentType == null || !contentType.StartsWith(BoundConfiguration.DefaultContentType, StringComparison.Ordinal))
{
@@ -41,7 +39,7 @@ private static async ValueTask BuildCoreAsync(bool isAsync, Api
["body"] = new DynamicValue(stringValue)
};
- return new DynamicResponse(dictionary);
+ return new T { Body = dictionary };
}
#if NET8_0_OR_GREATER
@@ -57,7 +55,7 @@ private static async ValueTask BuildCoreAsync(bool isAsync, Api
["body"] = new DynamicValue(stringValue)
};
- return new DynamicResponse(dictionary);
+ return new T { Body = dictionary };
}
#endif
@@ -82,14 +80,12 @@ private static async ValueTask BuildCoreAsync(bool isAsync, Api
["body"] = new DynamicValue(stringValue)
};
- response = new DynamicResponse(dictionary);
- }
- else
- {
- var body = LowLevelRequestResponseSerializer.Instance.Deserialize(responseStream);
- response = new DynamicResponse(body);
+ return new T { Body = dictionary };
}
- return response;
+ var body = LowLevelRequestResponseSerializer.Instance.Deserialize(responseStream);
+ return new T { Body = body };
}
}
+
+internal class DynamicResponseBuilder : DynamicResponseBuilder;
diff --git a/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs b/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs
index a2b67feb..102b6146 100644
--- a/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs
+++ b/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs
@@ -82,6 +82,12 @@ public string DebugInformation
///
public byte[]? ResponseBodyInBytes { get; internal set; }
+ ///
+ /// A product-specific error extracted from the response body, if any.
+ /// Populated by the for error status codes.
+ ///
+ public ErrorResponse? ProductError { get; internal set; }
+
///
/// The value of the Content-Type header in the response.
///
@@ -112,10 +118,9 @@ public string ResponseMimeType
internal bool SuccessOrKnownError =>
HasSuccessfulStatusCodeAndExpectedContentType
- || (HttpStatusCode >= 400
- && HttpStatusCode < 599
- && HttpStatusCode != 504 //Gateway timeout needs to be retried
- && HttpStatusCode != 503 //service unavailable needs to be retried
+ || (HttpStatusCode is >= 400 and < 599
+ && HttpStatusCode != 504 // Gateway timeout needs to be retried
+ && HttpStatusCode != 503 // Service unavailable needs to be retried
&& HttpStatusCode != 502
&& HasExpectedContentType);
diff --git a/src/Elastic.Transport/Responses/Json/JsonResponse.cs b/src/Elastic.Transport/Responses/Json/JsonResponse.cs
index 186b2f93..76a2793d 100644
--- a/src/Elastic.Transport/Responses/Json/JsonResponse.cs
+++ b/src/Elastic.Transport/Responses/Json/JsonResponse.cs
@@ -2,7 +2,6 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
-using System.Diagnostics.CodeAnalysis;
using System.Text.Json.Nodes;
namespace Elastic.Transport;
@@ -11,26 +10,13 @@ namespace Elastic.Transport;
/// A response type backed by from System.Text.Json.
///
/// Provides direct DOM access via (e.g. response.Body["hits"]["hits"][0])
-/// Also exposes a safe path traversal mechanism via using dot-notation paths.
+/// Also exposes a safe path traversal mechanism via using dot-notation paths.
///
-public sealed class JsonResponse : TransportResponse
+public sealed class JsonResponse : JsonResponseBase
{
///
public JsonResponse() { }
///
- public JsonResponse(JsonNode node) => Body = node;
-
- ///
- /// Traverses data using path notation.
- /// e.g some.deep.nested.json.path
- /// Supports bracket index syntax: hits.hits.[0]._source
- /// Supports first/last: hits.hits.[first()]._source, hits.hits._last_
- /// Supports arbitrary key: some._arbitrary_key_.value
- ///
- /// path into the stored object, keys are separated with a dot and the last key is returned as T
- ///
- /// T or default
- public T Get<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] T>(string path) =>
- JsonNodePathTraversal.Get(Body, path);
+ public JsonResponse(JsonNode node) : base(node) { }
}
diff --git a/src/Elastic.Transport/Responses/Json/JsonResponseBase.cs b/src/Elastic.Transport/Responses/Json/JsonResponseBase.cs
new file mode 100644
index 00000000..2e2af190
--- /dev/null
+++ b/src/Elastic.Transport/Responses/Json/JsonResponseBase.cs
@@ -0,0 +1,36 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+using System.Diagnostics.CodeAnalysis;
+using System.Text.Json.Nodes;
+
+namespace Elastic.Transport;
+
+///
+/// Base class for responses backed by from System.Text.Json.
+///
+/// Provides direct DOM access via (e.g. response.Body["hits"]["hits"][0])
+/// Also exposes a safe path traversal mechanism via using dot-notation paths.
+///
+public abstract class JsonResponseBase : TransportResponse
+{
+ ///
+ protected JsonResponseBase() { }
+
+ ///
+ protected JsonResponseBase(JsonNode node) => Body = node;
+
+ ///
+ /// Traverses data using path notation.
+ /// e.g some.deep.nested.json.path
+ /// Supports bracket index syntax: hits.hits.[0]._source
+ /// Supports first/last: hits.hits.[first()]._source, hits.hits._last_
+ /// Supports arbitrary key: some._arbitrary_key_.value
+ ///
+ /// path into the stored object, keys are separated with a dot and the last key is returned as T
+ ///
+ /// T or default
+ public T Get<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] T>(string path) =>
+ JsonNodePathTraversal.Get(Body, path);
+}
diff --git a/src/Elastic.Transport/Responses/Json/JsonResponseBuilder.cs b/src/Elastic.Transport/Responses/Json/JsonResponseBuilder.cs
index fdc3fb98..092e9917 100644
--- a/src/Elastic.Transport/Responses/Json/JsonResponseBuilder.cs
+++ b/src/Elastic.Transport/Responses/Json/JsonResponseBuilder.cs
@@ -15,15 +15,15 @@
namespace Elastic.Transport;
-internal class JsonResponseBuilder : TypedResponseBuilder
+internal class JsonResponseBuilder : TypedResponseBuilder where T : JsonResponseBase, new()
{
- protected override JsonResponse Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength) =>
+ protected override T Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength) =>
BuildCoreAsync(false, apiCallDetails, boundConfiguration, responseStream, contentType, contentLength).EnsureCompleted();
- protected override Task BuildAsync(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength, CancellationToken cancellationToken = default) =>
+ protected override Task BuildAsync(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength, CancellationToken cancellationToken = default) =>
BuildCoreAsync(true, apiCallDetails, boundConfiguration, responseStream, contentType, contentLength, cancellationToken).AsTask();
- private static async ValueTask BuildCoreAsync(bool isAsync, ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream,
+ private static async ValueTask BuildCoreAsync(bool isAsync, ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream,
string contentType, long contentLength, CancellationToken cancellationToken = default)
{
// If not JSON, store the result under "body"
@@ -34,7 +34,7 @@ private static async ValueTask BuildCoreAsync(bool isAsync, ApiCal
if (apiCallDetails.ResponseBodyInBytes is not null)
{
stringValue = Encoding.UTF8.GetString(apiCallDetails.ResponseBodyInBytes);
- return new JsonResponse(new JsonObject { ["body"] = stringValue });
+ return new T { Body = new JsonObject { ["body"] = stringValue } };
}
#if NET8_0_OR_GREATER
@@ -44,7 +44,7 @@ private static async ValueTask BuildCoreAsync(bool isAsync, ApiCal
responseStream.ReadExactly(buffer, 0, (int)contentLength);
stringValue = Encoding.UTF8.GetString(buffer.AsSpan(0, (int)contentLength));
ArrayPool.Shared.Return(buffer);
- return new JsonResponse(new JsonObject { ["body"] = stringValue });
+ return new T { Body = new JsonObject { ["body"] = stringValue } };
}
#endif
@@ -64,7 +64,7 @@ private static async ValueTask BuildCoreAsync(bool isAsync, ApiCal
stringValue = sr.ReadToEnd();
}
- return new JsonResponse(new JsonObject { ["body"] = stringValue });
+ return new T { Body = new JsonObject { ["body"] = stringValue } };
}
// JSON content: parse into JsonNode
@@ -78,6 +78,11 @@ private static async ValueTask BuildCoreAsync(bool isAsync, ApiCal
node = JsonNode.Parse(responseStream);
}
- return node is not null ? new JsonResponse(node) : new JsonResponse();
+ var response = new T();
+ if (node is not null)
+ response.Body = node;
+ return response;
}
}
+
+internal class JsonResponseBuilder : JsonResponseBuilder;
diff --git a/src/Elastic.Transport/Responses/ResponseFactory.cs b/src/Elastic.Transport/Responses/ResponseFactory.cs
index 95ea4baa..d7ecb911 100644
--- a/src/Elastic.Transport/Responses/ResponseFactory.cs
+++ b/src/Elastic.Transport/Responses/ResponseFactory.cs
@@ -29,7 +29,7 @@ public abstract TResponse Create(
Exception? ex,
int? statusCode,
Dictionary>? headers,
- Stream responseStream,
+ Stream? responseStream,
string? contentType,
long contentLength,
IReadOnlyDictionary? threadPoolStats,
diff --git a/src/Elastic.Transport/Responses/Special/PipeResponse.cs b/src/Elastic.Transport/Responses/Special/PipeResponse.cs
index 0bf387b2..8d29a783 100644
--- a/src/Elastic.Transport/Responses/Special/PipeResponse.cs
+++ b/src/Elastic.Transport/Responses/Special/PipeResponse.cs
@@ -3,12 +3,8 @@
// See the LICENSE file in the project root for more information
#if NET10_0_OR_GREATER
-using System;
using System.IO;
using System.IO.Pipelines;
-using System.Threading;
-using System.Threading.Tasks;
-using Elastic.Transport.Extensions;
namespace Elastic.Transport;
@@ -22,131 +18,22 @@ namespace Elastic.Transport;
/// MUST be disposed after use to ensure the HTTP connection is freed for reuse.
///
///
-public sealed class PipeResponse : TransportResponse, IAsyncDisposable, IDisposable
+public sealed class PipeResponse : PipeResponseBase
{
- private readonly Stream _stream;
- private bool _disposed;
-
///
public PipeResponse() : this(Stream.Null, string.Empty) { }
///
- public PipeResponse(Stream responseStream, string? contentType)
- {
- responseStream.ThrowIfNull(nameof(responseStream));
- _stream = responseStream;
- ContentType = contentType ?? string.Empty;
- Body = PipeReader.Create(_stream, new StreamPipeReaderOptions(leaveOpen: false));
- }
-
- ///
- /// The MIME type of the response, if present.
- ///
- public string ContentType { get; }
+ public PipeResponse(Stream responseStream, string? contentType) : base(responseStream, contentType) { }
///
/// The response body as a .
///
- /// Can be used directly with
- /// or
+ /// Can be used directly with
+ /// or
/// for efficient deserialization without intermediate buffering.
///
///
- public PipeReader Body { get; }
-
- ///
- protected internal override bool LeaveOpen => true;
-
- ///
- /// Copies the response body directly to a .
- ///
- /// This is ideal for forwarding Elasticsearch responses directly to ASP.NET Core's
- /// HttpContext.Response.BodyWriter without intermediate buffering.
- ///
- ///
- /// The to write to (e.g., HttpContext.Response.BodyWriter)
- /// Cancellation token
- ///
- ///
- /// // In an ASP.NET Core minimal API or controller:
- /// app.MapGet("/search", async (HttpContext context, ITransport transport) =>
- /// {
- /// await using var response = await transport.GetAsync<PipeResponse>(path);
- /// context.Response.ContentType = response.ContentType;
- /// await response.CopyToAsync(context.Response.BodyWriter, context.RequestAborted);
- /// });
- ///
- ///
- public async Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default)
- {
- ObjectDisposedException.ThrowIf(_disposed, this);
-
- while (true)
- {
- var result = await Body.ReadAsync(cancellationToken).ConfigureAwait(false);
- var buffer = result.Buffer;
-
- try
- {
- foreach (var segment in buffer)
- {
- var destBuffer = destination.GetMemory(segment.Length);
- segment.CopyTo(destBuffer);
- destination.Advance(segment.Length);
- }
-
- var flushResult = await destination.FlushAsync(cancellationToken).ConfigureAwait(false);
-
- if (result.IsCompleted || flushResult.IsCompleted)
- break;
- }
- finally
- {
- Body.AdvanceTo(buffer.End);
- }
- }
- }
-
- ///
- /// Disposes the underlying stream and completes the .
- ///
- public void Dispose()
- {
- if (_disposed)
- return;
-
- Body.Complete();
- _stream.Dispose();
-
- if (LinkedDisposables is not null)
- {
- foreach (var disposable in LinkedDisposables)
- disposable?.Dispose();
- }
-
- _disposed = true;
- GC.SuppressFinalize(this);
- }
-
- ///
- /// Asynchronously disposes the underlying stream and completes the .
- ///
- public async ValueTask DisposeAsync()
- {
- if (_disposed)
- return;
-
- await Body.CompleteAsync().ConfigureAwait(false);
- await _stream.DisposeAsync().ConfigureAwait(false);
-
- if (LinkedDisposables is not null)
- {
- foreach (var disposable in LinkedDisposables)
- disposable?.Dispose();
- }
-
- _disposed = true;
- GC.SuppressFinalize(this);
- }
+ public PipeReader Body => Pipe;
}
#endif
diff --git a/src/Elastic.Transport/Responses/Special/PipeResponseBase.cs b/src/Elastic.Transport/Responses/Special/PipeResponseBase.cs
new file mode 100644
index 00000000..d527ce9d
--- /dev/null
+++ b/src/Elastic.Transport/Responses/Special/PipeResponseBase.cs
@@ -0,0 +1,144 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+#if NET10_0_OR_GREATER
+using System;
+using System.IO;
+using System.IO.Pipelines;
+using System.Threading;
+using System.Threading.Tasks;
+using Elastic.Transport.Extensions;
+
+namespace Elastic.Transport;
+
+///
+/// Base class for responses that expose the response as a .
+///
+/// This leverages .NET 10's support for deserialization
+/// directly from , avoiding intermediate Stream conversions.
+///
+///
+/// MUST be disposed after use to ensure the HTTP connection is freed for reuse.
+///
+///
+public abstract class PipeResponseBase : TransportResponse, IAsyncDisposable, IDisposable
+{
+ private readonly Stream _stream;
+ private bool _disposed;
+
+ ///
+ protected PipeResponseBase(Stream responseStream, string? contentType)
+ {
+ responseStream.ThrowIfNull(nameof(responseStream));
+ _stream = responseStream;
+ ContentType = contentType ?? string.Empty;
+ Pipe = PipeReader.Create(_stream, new StreamPipeReaderOptions(leaveOpen: false));
+ }
+
+ ///
+ /// The MIME type of the response, if present.
+ ///
+ public string ContentType { get; }
+
+ ///
+ /// The response body as a .
+ ///
+ protected PipeReader Pipe { get; }
+
+ ///
+ protected internal override bool LeaveOpen => true;
+
+ ///
+ /// Copies the response body directly to a .
+ ///
+ /// This is ideal for forwarding Elasticsearch responses directly to ASP.NET Core's
+ /// HttpContext.Response.BodyWriter without intermediate buffering.
+ ///
+ ///
+ /// The to write to (e.g., HttpContext.Response.BodyWriter)
+ /// Cancellation token
+ ///
+ ///
+ /// // In an ASP.NET Core minimal API or controller:
+ /// app.MapGet("/search", async (HttpContext context, ITransport transport) =>
+ /// {
+ /// await using var response = await transport.GetAsync<PipeResponse>(path);
+ /// context.Response.ContentType = response.ContentType;
+ /// await response.CopyToAsync(context.Response.BodyWriter, context.RequestAborted);
+ /// });
+ ///
+ ///
+ public async Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default)
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ while (true)
+ {
+ var result = await Pipe.ReadAsync(cancellationToken).ConfigureAwait(false);
+ var buffer = result.Buffer;
+
+ try
+ {
+ foreach (var segment in buffer)
+ {
+ var destBuffer = destination.GetMemory(segment.Length);
+ segment.CopyTo(destBuffer);
+ destination.Advance(segment.Length);
+ }
+
+ var flushResult = await destination.FlushAsync(cancellationToken).ConfigureAwait(false);
+
+ if (result.IsCompleted || flushResult.IsCompleted)
+ break;
+ }
+ finally
+ {
+ Pipe.AdvanceTo(buffer.End);
+ }
+ }
+ }
+
+ ///
+ /// Disposes the underlying stream and completes the .
+ ///
+ public void Dispose()
+ {
+ if (_disposed)
+ return;
+
+ Pipe.Complete();
+ _stream.Dispose();
+
+ if (LinkedDisposables is not null)
+ {
+ foreach (var disposable in LinkedDisposables)
+ disposable?.Dispose();
+ }
+
+ _disposed = true;
+ GC.SuppressFinalize(this);
+ }
+
+ ///
+ /// Asynchronously disposes the underlying stream and completes the .
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ if (_disposed)
+ return;
+
+ await Pipe.CompleteAsync().ConfigureAwait(false);
+ await _stream.DisposeAsync().ConfigureAwait(false);
+
+ if (LinkedDisposables is not null)
+ {
+ foreach (var disposable in LinkedDisposables)
+ disposable?.Dispose();
+ }
+
+ _disposed = true;
+ GC.SuppressFinalize(this);
+ }
+}
+#endif
diff --git a/src/Elastic.Transport/Responses/Special/StreamResponse.cs b/src/Elastic.Transport/Responses/Special/StreamResponse.cs
index fed6df0f..edd397d2 100644
--- a/src/Elastic.Transport/Responses/Special/StreamResponse.cs
+++ b/src/Elastic.Transport/Responses/Special/StreamResponse.cs
@@ -16,17 +16,10 @@ namespace Elastic.Transport;
public sealed class StreamResponse : StreamResponseBase, IDisposable
{
///
- public StreamResponse() : base(Stream.Null) =>
- ContentType = string.Empty;
+ public StreamResponse() : base(Stream.Null) { }
///
- public StreamResponse(Stream body, string? contentType) : base(body) =>
- ContentType = contentType ?? string.Empty;
-
- ///
- /// The MIME type of the response, if present.
- ///
- public string ContentType { get; }
+ public StreamResponse(Stream body, string? contentType) : base(body, contentType) { }
///
/// The raw response stream.
diff --git a/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs
index 28e48daf..ad4921c1 100644
--- a/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs
+++ b/src/Elastic.Transport/Responses/Special/StreamResponseBase.cs
@@ -24,16 +24,22 @@ public abstract class StreamResponseBase : TransportResponse, IDisposable
///
protected Stream Stream { get; }
+ ///
+ /// The MIME type of the response, if present.
+ ///
+ public string ContentType { get; }
+
///
/// Indicates that the response has been disposed and it is not longer safe to access the stream.
///
protected bool Disposed { get; private set; }
///
- public StreamResponseBase(Stream responseStream)
+ protected StreamResponseBase(Stream responseStream, string? contentType = null)
{
responseStream.ThrowIfNull(nameof(responseStream));
Stream = responseStream;
+ ContentType = contentType ?? string.Empty;
}
///
diff --git a/src/Elastic.Transport/Responses/Special/StringResponse.cs b/src/Elastic.Transport/Responses/Special/StringResponse.cs
index 879fa5be..2bbb28fd 100644
--- a/src/Elastic.Transport/Responses/Special/StringResponse.cs
+++ b/src/Elastic.Transport/Responses/Special/StringResponse.cs
@@ -7,11 +7,11 @@ namespace Elastic.Transport;
///
/// A response that exposes the response as .
///
-public sealed class StringResponse : TransportResponse
+public sealed class StringResponse : StringResponseBase
{
///
- public StringResponse() => Body = string.Empty;
+ public StringResponse() { }
///
- public StringResponse(string body) => Body = body;
+ public StringResponse(string body) : base(body) { }
}
diff --git a/src/Elastic.Transport/Responses/Special/StringResponseBase.cs b/src/Elastic.Transport/Responses/Special/StringResponseBase.cs
new file mode 100644
index 00000000..d9492c59
--- /dev/null
+++ b/src/Elastic.Transport/Responses/Special/StringResponseBase.cs
@@ -0,0 +1,17 @@
+// Licensed to Elasticsearch B.V under one or more agreements.
+// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information
+
+namespace Elastic.Transport;
+
+///
+/// Base class for responses that expose the response body as a .
+///
+public abstract class StringResponseBase : TransportResponse
+{
+ ///
+ protected StringResponseBase() => Body = string.Empty;
+
+ ///
+ protected StringResponseBase(string body) => Body = body;
+}
diff --git a/src/Elastic.Transport/Responses/Special/StringResponseBuilder.cs b/src/Elastic.Transport/Responses/Special/StringResponseBuilder.cs
index dada028c..06433483 100644
--- a/src/Elastic.Transport/Responses/Special/StringResponseBuilder.cs
+++ b/src/Elastic.Transport/Responses/Special/StringResponseBuilder.cs
@@ -14,16 +14,16 @@
namespace Elastic.Transport;
-internal class StringResponseBuilder : TypedResponseBuilder
+internal class StringResponseBuilder : TypedResponseBuilder where T : StringResponseBase, new()
{
- protected override StringResponse Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength)
+ protected override T Build(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength)
{
string responseString;
if (apiCallDetails.ResponseBodyInBytes is not null)
{
responseString = Encoding.UTF8.GetString(apiCallDetails.ResponseBodyInBytes);
- return new StringResponse(responseString);
+ return new T { Body = responseString };
}
#if NET8_0_OR_GREATER
@@ -33,16 +33,16 @@ protected override StringResponse Build(ApiCallDetails apiCallDetails, BoundConf
responseStream.ReadExactly(buffer, 0, (int)contentLength);
responseString = Encoding.UTF8.GetString(buffer.AsSpan(0, (int)contentLength));
ArrayPool.Shared.Return(buffer);
- return new StringResponse(responseString);
+ return new T { Body = responseString };
}
#endif
var sr = new StreamReader(responseStream);
responseString = sr.ReadToEnd();
- return new StringResponse(responseString);
+ return new T { Body = responseString };
}
- protected override async Task BuildAsync(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength,
+ protected override async Task BuildAsync(ApiCallDetails apiCallDetails, BoundConfiguration boundConfiguration, Stream responseStream, string contentType, long contentLength,
CancellationToken cancellationToken = default)
{
string responseString;
@@ -50,7 +50,7 @@ protected override async Task BuildAsync(ApiCallDetails apiCallD
if (apiCallDetails.ResponseBodyInBytes is not null)
{
responseString = Encoding.UTF8.GetString(apiCallDetails.ResponseBodyInBytes);
- return new StringResponse(responseString);
+ return new T { Body = responseString };
}
#if NET8_0_OR_GREATER
@@ -60,7 +60,7 @@ protected override async Task BuildAsync(ApiCallDetails apiCallD
await responseStream.ReadExactlyAsync(buffer, 0, (int)contentLength, cancellationToken).ConfigureAwait(false);
responseString = Encoding.UTF8.GetString(buffer.AsSpan(0, (int)contentLength));
ArrayPool.Shared.Return(buffer);
- return new StringResponse(responseString);
+ return new T { Body = responseString };
}
#endif
@@ -70,6 +70,8 @@ protected override async Task BuildAsync(ApiCallDetails apiCallD
#else
responseString = await sr.ReadToEndAsync().ConfigureAwait(false);
#endif
- return new StringResponse(responseString);
+ return new T { Body = responseString };
}
}
+
+internal class StringResponseBuilder : StringResponseBuilder;