Skip to content

Commit b778158

Browse files
committed
Merge pull request #1060 from elasticsearch/refactor/transport-stream-handling
Refactor transport layer and connection pool 401 handling
2 parents a53a8e7 + 089d5f1 commit b778158

File tree

52 files changed

+3385
-1171
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+3385
-1171
lines changed

src/Elasticsearch.Net/Connection/Configuration/ConnectionConfiguration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public T SetTimeout(int timeout)
250250
/// <summary>
251251
/// This is a separate timeout for Ping() requests. A ping should fail as fast as possible.
252252
/// </summary>
253-
/// <param name="timeout">The ping timeout in milliseconds defaults to 50</param>
253+
/// <param name="timeout">The ping timeout in milliseconds defaults to 200</param>
254254
public T SetPingTimeout(int timeout)
255255
{
256256
this._pingTimeout = timeout;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using Elasticsearch.Net.Connection.Configuration;
2+
using Elasticsearch.Net.Connection.RequestState;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.IO;
6+
using System.Linq;
7+
using System.Text;
8+
using System.Threading.Tasks;
9+
10+
namespace Elasticsearch.Net.Connection
11+
{
12+
internal interface ITransportDelegator
13+
{
14+
bool Ping(ITransportRequestState requestState);
15+
Task<bool> PingAsync(ITransportRequestState requestState);
16+
IList<Uri> Sniff(ITransportRequestState ownerState = null);
17+
void SniffClusterState(ITransportRequestState requestState = null);
18+
void SniffOnStaleClusterState(ITransportRequestState requestState);
19+
void SniffOnConnectionFailure(ITransportRequestState requestState);
20+
21+
/// <summary>
22+
/// Returns either the fixed maximum set on the connection configuration settings or the number of nodes
23+
/// </summary>
24+
/// <param name="requestState"></param>
25+
int GetMaximumRetries(IRequestConfiguration requestConfiguration);
26+
27+
bool SniffingDisabled(IRequestConfiguration requestConfiguration);
28+
bool SniffOnFaultDiscoveredMoreNodes(ITransportRequestState requestState, int retried, ElasticsearchResponse<Stream> streamResponse);
29+
30+
/// <summary>
31+
/// Selects next node uri on request state
32+
/// </summary>
33+
/// <returns>bool hint whether the new current node needs to pinged first</returns>
34+
bool SelectNextNode(ITransportRequestState requestState);
35+
}
36+
}
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
4+
using System.Net.NetworkInformation;
5+
using System.Text;
6+
using System.Threading.Tasks;
7+
using Elasticsearch.Net.Connection.Configuration;
8+
using Elasticsearch.Net.Connection.RequestState;
9+
using Elasticsearch.Net.ConnectionPool;
10+
using Elasticsearch.Net.Exceptions;
11+
using Elasticsearch.Net.Providers;
12+
using Elasticsearch.Net.Serialization;
13+
14+
namespace Elasticsearch.Net.Connection.RequestHandlers
15+
{
16+
internal class RequestHandler : RequestHandlerBase
17+
{
18+
public RequestHandler(
19+
IConnectionConfigurationValues settings,
20+
IConnectionPool connectionPool,
21+
IConnection connection,
22+
IElasticsearchSerializer serializer,
23+
IMemoryStreamProvider memoryStreamProvider,
24+
ITransportDelegator delegator)
25+
: base(settings, connection, connectionPool, serializer, memoryStreamProvider, delegator)
26+
{
27+
}
28+
29+
public ElasticsearchResponse<T> Request<T>(TransportRequestState<T> requestState, object data = null)
30+
{
31+
var postData = PostData(data);
32+
requestState.TickSerialization(postData);
33+
34+
var response = this.DoRequest<T>(requestState);
35+
requestState.SetResult(response);
36+
return response;
37+
}
38+
39+
private ElasticsearchResponse<T> SelectNextNode<T>(TransportRequestState<T> requestState)
40+
{
41+
// Select the next node to hit and signal whether the selected node needs a ping
42+
var nodeRequiresPinging = this._delegator.SelectNextNode(requestState);
43+
if (!nodeRequiresPinging) return null;
44+
45+
var pingSuccess = this._delegator.Ping(requestState);
46+
// If ping is not successful retry request on the next node the connection pool selects
47+
return !pingSuccess ? RetryRequest<T>(requestState) : null;
48+
}
49+
50+
private ElasticsearchResponse<Stream> DoElasticsearchCall<T>(TransportRequestState<T> requestState)
51+
{
52+
// Do the actual request by calling into IConnection
53+
// We wrap it in a IRequestTimings to audit the request
54+
ElasticsearchResponse<Stream> streamResponse;
55+
using (var requestAudit = requestState.InitiateRequest(RequestType.ElasticsearchCall))
56+
{
57+
streamResponse = this.CallInToConnection(requestState);
58+
requestAudit.Finish(streamResponse.Success, streamResponse.HttpStatusCode);
59+
}
60+
return streamResponse;
61+
}
62+
63+
private ElasticsearchResponse<T> ReturnStreamOrVoidResponse<T>(
64+
TransportRequestState<T> requestState, ElasticsearchResponse<Stream> streamResponse)
65+
{
66+
// If the response never recieved a status code and has a caught exception make sure we throw it
67+
if (streamResponse.HttpStatusCode.GetValueOrDefault(-1) <= 0 && streamResponse.OriginalException != null)
68+
throw streamResponse.OriginalException;
69+
70+
// If the user explicitly wants a stream returned the undisposed stream
71+
if (typeof(Stream).IsAssignableFrom(typeof(T)))
72+
return streamResponse as ElasticsearchResponse<T>;
73+
74+
if (!typeof(VoidResponse).IsAssignableFrom(typeof(T))) return null;
75+
76+
var voidResponse = ElasticsearchResponse.CloneFrom<VoidResponse>(streamResponse, null);
77+
78+
return voidResponse as ElasticsearchResponse<T>;
79+
}
80+
81+
private ElasticsearchResponse<T> ReturnTypedResponse<T>(
82+
TransportRequestState<T> requestState,
83+
ElasticsearchResponse<Stream> streamResponse,
84+
out ElasticsearchServerError error)
85+
{
86+
error = null;
87+
88+
// Read to memory stream if needed
89+
var hasResponse = streamResponse.Response != null;
90+
var forceRead = this._settings.KeepRawResponse || typeof(T) == typeof(string) || typeof(T) == typeof(byte[]);
91+
byte[] bytes = null;
92+
if (hasResponse && forceRead)
93+
{
94+
var ms = this._memoryStreamProvider.New();
95+
streamResponse.Response.CopyTo(ms);
96+
bytes = ms.ToArray();
97+
streamResponse.Response.Close();
98+
streamResponse.Response = ms;
99+
streamResponse.Response.Position = 0;
100+
}
101+
// Set rawresponse if needed
102+
if (this._settings.KeepRawResponse) streamResponse.ResponseRaw = bytes;
103+
104+
var isValidResponse = IsValidResponse(requestState, streamResponse);
105+
if (isValidResponse)
106+
return this.StreamToTypedResponse<T>(streamResponse, requestState, bytes);
107+
108+
// If error read error
109+
error = GetErrorFromStream<T>(streamResponse.Response);
110+
var typedResponse = ElasticsearchResponse.CloneFrom<T>(streamResponse, default(T));
111+
this.SetStringOrByteResult(typedResponse, bytes);
112+
return typedResponse;
113+
}
114+
115+
private ElasticsearchResponse<T> CoordinateRequest<T>(TransportRequestState<T> requestState, int maxRetries, int retried, ref bool aliveResponse)
116+
{
117+
var pingRetryRequest = this.SelectNextNode(requestState);
118+
if (pingRetryRequest != null) return pingRetryRequest;
119+
120+
var streamResponse = this.DoElasticsearchCall(requestState);
121+
122+
aliveResponse = streamResponse.SuccessOrKnownError;
123+
124+
if (!this.DoneProcessing(streamResponse, requestState, maxRetries, retried))
125+
return null;
126+
127+
ElasticsearchServerError error = null;
128+
var typedResponse = this.ReturnStreamOrVoidResponse(requestState, streamResponse)
129+
?? this.ReturnTypedResponse(requestState, streamResponse, out error);
130+
131+
this.OptionallyCloseResponseStreamAndSetSuccess(requestState, error, typedResponse, streamResponse);
132+
if (error != null && this._settings.ThrowOnElasticsearchServerExceptions)
133+
throw new ElasticsearchServerException(error);
134+
return typedResponse;
135+
}
136+
137+
private ElasticsearchResponse<T> DoRequest<T>(TransportRequestState<T> requestState)
138+
{
139+
var sniffAuthResponse = this.TrySniffOnStaleClusterState(requestState);
140+
if (sniffAuthResponse != null) return sniffAuthResponse;
141+
142+
bool aliveResponse = false;
143+
bool seenError = false;
144+
int retried = requestState.Retried;
145+
int maxRetries = this._delegator.GetMaximumRetries(requestState.RequestConfiguration);
146+
147+
try
148+
{
149+
var response = this.CoordinateRequest(requestState, maxRetries, retried, ref aliveResponse);
150+
if (response != null) return response;
151+
}
152+
catch (ElasticsearchAuthenticationException e)
153+
{
154+
return this.HandleAuthenticationException(requestState, e);
155+
}
156+
catch (MaxRetryException)
157+
{
158+
//TODO ifdef ExceptionDispatchInfo.Capture(ex).Throw();
159+
throw;
160+
}
161+
catch (ElasticsearchServerException)
162+
{
163+
//TODO ifdef ExceptionDispatchInfo.Capture(ex).Throw();
164+
throw;
165+
}
166+
catch (Exception e)
167+
{
168+
requestState.SeenExceptions.Add(e);
169+
if (!requestState.UsingPooling || maxRetries == 0 && retried == 0)
170+
{
171+
//TODO ifdef ExceptionDispatchInfo.Capture(ex).Throw();
172+
throw;
173+
}
174+
seenError = true;
175+
return RetryRequest<T>(requestState);
176+
}
177+
finally
178+
{
179+
//make sure we always call markalive on the uri if the connection was succesful
180+
if (!seenError && aliveResponse)
181+
this._connectionPool.MarkAlive(requestState.CurrentNode);
182+
}
183+
return this.RetryRequest(requestState);
184+
}
185+
186+
private ElasticsearchResponse<T> RetryRequest<T>(TransportRequestState<T> requestState)
187+
{
188+
189+
var maxRetries = this._delegator.GetMaximumRetries(requestState.RequestConfiguration);
190+
191+
this._connectionPool.MarkDead(requestState.CurrentNode, this._settings.DeadTimeout, this._settings.MaxDeadTimeout);
192+
193+
try
194+
{
195+
this._delegator.SniffOnConnectionFailure(requestState);
196+
}
197+
catch (ElasticsearchAuthenticationException e)
198+
{
199+
//If the sniff already returned a 401 fail/return a response as early as possible
200+
return this.HandleAuthenticationException(requestState, e);
201+
}
202+
203+
this.ThrowMaxRetryExceptionWhenNeeded(requestState, maxRetries);
204+
205+
return this.DoRequest<T>(requestState);
206+
}
207+
208+
private ElasticsearchResponse<Stream> CallInToConnection<T>(TransportRequestState<T> requestState)
209+
{
210+
var uri = requestState.CreatePathOnCurrentNode();
211+
var postData = requestState.PostData;
212+
var requestConfiguration = requestState.RequestConfiguration;
213+
switch (requestState.Method.ToLowerInvariant())
214+
{
215+
case "post": return this._connection.PostSync(uri, postData, requestConfiguration);
216+
case "put": return this._connection.PutSync(uri, postData, requestConfiguration);
217+
case "head": return this._connection.HeadSync(uri, requestConfiguration);
218+
case "get": return this._connection.GetSync(uri, requestConfiguration);
219+
case "delete":
220+
return postData == null || postData.Length == 0
221+
? this._connection.DeleteSync(uri, requestConfiguration)
222+
: this._connection.DeleteSync(uri, postData, requestConfiguration);
223+
}
224+
throw new Exception("Unknown HTTP method " + requestState.Method);
225+
}
226+
227+
protected ElasticsearchResponse<T> StreamToTypedResponse<T>(
228+
ElasticsearchResponse<Stream> streamResponse,
229+
ITransportRequestState requestState,
230+
byte[] readBytes
231+
)
232+
{
233+
//set response
234+
if (typeof(T) == typeof(string) || typeof(T) == typeof(byte[]))
235+
{
236+
var clone = ElasticsearchResponse.CloneFrom<T>(streamResponse, default(T));
237+
this.SetStringOrByteResult(clone, readBytes);
238+
return clone;
239+
}
240+
var typedResponse = ElasticsearchResponse.CloneFrom<T>(streamResponse, default(T));
241+
using (streamResponse.Response)
242+
{
243+
var deserializationState = requestState.ResponseCreationOverride;
244+
var customConverter = deserializationState as Func<IElasticsearchResponse, Stream, T>;
245+
if (customConverter != null)
246+
{
247+
var t = customConverter(typedResponse, streamResponse.Response);
248+
typedResponse.Response = t;
249+
return typedResponse;
250+
}
251+
var deserialized = this._serializer.Deserialize<T>(streamResponse.Response);
252+
typedResponse.Response = deserialized;
253+
return typedResponse;
254+
}
255+
}
256+
257+
}
258+
}

0 commit comments

Comments
 (0)