1818 */
1919package org .elasticsearch .repositories .s3 ;
2020
21+ import com .amazonaws .http .AmazonHttpClient ;
22+ import com .amazonaws .services .s3 .Headers ;
2123import com .sun .net .httpserver .HttpExchange ;
2224import com .sun .net .httpserver .HttpHandler ;
2325import com .sun .net .httpserver .HttpServer ;
26+ import org .apache .http .HttpStatus ;
27+ import org .elasticsearch .common .Strings ;
2428import org .elasticsearch .common .SuppressForbidden ;
2529import org .elasticsearch .common .bytes .BytesReference ;
2630import org .elasticsearch .common .io .Streams ;
5357import java .util .Map ;
5458import java .util .concurrent .ConcurrentHashMap ;
5559import java .util .concurrent .ConcurrentMap ;
60+ import java .util .concurrent .atomic .AtomicInteger ;
5661
5762import static java .nio .charset .StandardCharsets .UTF_8 ;
5863import static org .hamcrest .Matchers .nullValue ;
@@ -70,7 +75,11 @@ public static void startHttpServer() throws Exception {
7075
7176 @ Before
7277 public void setUpHttpServer () {
73- httpServer .createContext ("/bucket" , new InternalHttpHandler ());
78+ HttpHandler handler = new InternalHttpHandler ();
79+ if (randomBoolean ()) {
80+ handler = new ErroneousHttpHandler (handler , randomIntBetween (2 , 3 ));
81+ }
82+ httpServer .createContext ("/bucket" , handler );
7483 }
7584
7685 @ AfterClass
@@ -114,6 +123,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
114123 return Settings .builder ()
115124 .put (Settings .builder ()
116125 .put (S3ClientSettings .ENDPOINT_SETTING .getConcreteSettingForNamespace ("test" ).getKey (), endpoint )
126+ // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
117127 .put (S3ClientSettings .DISABLE_CHUNKED_ENCODING .getConcreteSettingForNamespace ("test" ).getKey (), true )
118128 .build ())
119129 .put (super .nodeSettings (nodeOrdinal ))
@@ -130,7 +140,6 @@ public TestS3RepositoryPlugin(final Settings settings) {
130140 @ Override
131141 public List <Setting <?>> getSettings () {
132142 final List <Setting <?>> settings = new ArrayList <>(super .getSettings ());
133- // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
134143 settings .add (S3ClientSettings .DISABLE_CHUNKED_ENCODING );
135144 return settings ;
136145 }
@@ -229,4 +238,50 @@ public void handle(final HttpExchange exchange) throws IOException {
229238 }
230239 }
231240 }
241+
242+ /**
243+ * HTTP handler that injects random S3 service errors
244+ *
245+ * Note: it is not a good idea to allow this handler to simulate too many errors as it would
246+ * slow down the test suite and/or could trigger SDK client request throttling (and request
247+ * would fail before reaching the max retry attempts - this can be mitigated by disabling
248+ * {@link S3ClientSettings#USE_THROTTLE_RETRIES_SETTING})
249+ */
250+ @ SuppressForbidden (reason = "this test uses a HttpServer to emulate an S3 endpoint" )
251+ private static class ErroneousHttpHandler implements HttpHandler {
252+
253+ // first key is the remote address, second key is the HTTP request unique id provided by the AWS SDK client,
254+ // value is the number of times the request has been seen
255+ private final Map <String , AtomicInteger > requests ;
256+ private final HttpHandler delegate ;
257+ private final int maxErrorsPerRequest ;
258+
259+ private ErroneousHttpHandler (final HttpHandler delegate , final int maxErrorsPerRequest ) {
260+ this .requests = new ConcurrentHashMap <>();
261+ this .delegate = delegate ;
262+ this .maxErrorsPerRequest = maxErrorsPerRequest ;
263+ assert maxErrorsPerRequest > 1 ;
264+ }
265+
266+ @ Override
267+ public void handle (final HttpExchange exchange ) throws IOException {
268+ final String requestId = exchange .getRequestHeaders ().getFirst (AmazonHttpClient .HEADER_SDK_TRANSACTION_ID );
269+ assert Strings .hasText (requestId );
270+
271+ final int count = requests .computeIfAbsent (requestId , req -> new AtomicInteger (0 )).incrementAndGet ();
272+ if (count >= maxErrorsPerRequest || randomBoolean ()) {
273+ requests .remove (requestId );
274+ delegate .handle (exchange );
275+ } else {
276+ handleAsError (exchange , requestId );
277+ }
278+ }
279+
280+ private void handleAsError (final HttpExchange exchange , final String requestId ) throws IOException {
281+ Streams .readFully (exchange .getRequestBody ());
282+ exchange .getResponseHeaders ().add (Headers .REQUEST_ID , requestId );
283+ exchange .sendResponseHeaders (HttpStatus .SC_INTERNAL_SERVER_ERROR , -1 );
284+ exchange .close ();
285+ }
286+ }
232287}
0 commit comments