diff --git a/client/rest/src/main/java/org/elasticsearch/client/Cancellable.java b/client/rest/src/main/java/org/elasticsearch/client/Cancellable.java new file mode 100644 index 0000000000000..6a31ab3fe1744 --- /dev/null +++ b/client/rest/src/main/java/org/elasticsearch/client/Cancellable.java @@ -0,0 +1,87 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client; + +import org.apache.http.client.methods.AbstractExecutionAwareRequest; +import org.apache.http.client.methods.HttpRequestBase; + +import java.util.concurrent.CancellationException; + +/** + * Represents an operation that can be cancelled. + * Returned when executing async requests through {@link RestClient#performRequestAsync(Request, ResponseListener)}, so that the request + * can be cancelled if needed. Cancelling a request will result in calling {@link AbstractExecutionAwareRequest#abort()} on the underlying + * request object, which will in turn cancel its corresponding {@link java.util.concurrent.Future}. + * Note that cancelling a request does not automatically translate to aborting its execution on the server side, which needs to be + * specifically implemented in each API. + */ +public class Cancellable { + + static final Cancellable NO_OP = new Cancellable(null) { + @Override + public void cancel() { + } + + @Override + void runIfNotCancelled(Runnable runnable) { + throw new UnsupportedOperationException(); + } + }; + + static Cancellable fromRequest(HttpRequestBase httpRequest) { + return new Cancellable(httpRequest); + } + + private final HttpRequestBase httpRequest; + + private Cancellable(HttpRequestBase httpRequest) { + this.httpRequest = httpRequest; + } + + /** + * Cancels the on-going request that is associated with the current instance of {@link Cancellable}. + * + */ + public synchronized void cancel() { + this.httpRequest.abort(); + } + + /** + * Executes some arbitrary code iff the on-going request has not been cancelled, otherwise throws {@link CancellationException}. + * This is needed to guarantee that cancelling a request works correctly even in case {@link #cancel()} is called between different + * attempts of the same request. The low-level client reuses the same instance of the {@link AbstractExecutionAwareRequest} by calling + * {@link AbstractExecutionAwareRequest#reset()} between subsequent retries. The {@link #cancel()} method can be called at anytime, + * and we need to handle the case where it gets called while there is no request being executed as one attempt may have failed and + * the subsequent attempt has not been started yet. + * If the request has already been cancelled we don't go ahead with the next attempt, and artificially raise the + * {@link CancellationException}, otherwise we run the provided {@link Runnable} which will reset the request and send the next attempt. + * Note that this method must be synchronized as well as the {@link #cancel()} method, to prevent a request from being cancelled + * when there is no future to cancel, which would make cancelling the request a no-op. + */ + synchronized void runIfNotCancelled(Runnable runnable) { + if (this.httpRequest.isAborted()) { + throw newCancellationException(); + } + runnable.run(); + } + + static CancellationException newCancellationException() { + return new CancellationException("request was cancelled"); + } +} diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 38185ac960926..a31732d742731 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -277,60 +277,64 @@ private ResponseOrResponseException convertResponse(InternalRequest request, Nod * @param responseListener the {@link ResponseListener} to notify when the * request is completed or fails */ - public void performRequestAsync(Request request, ResponseListener responseListener) { + public Cancellable performRequestAsync(Request request, ResponseListener responseListener) { try { FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener); InternalRequest internalRequest = new InternalRequest(request); performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener); + return internalRequest.cancellable; } catch (Exception e) { responseListener.onFailure(e); + return Cancellable.NO_OP; } } private void performRequestAsync(final NodeTuple> nodeTuple, final InternalRequest request, final FailureTrackingResponseListener listener) { - final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); - client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback() { - @Override - public void completed(HttpResponse httpResponse) { - try { - ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); - if (responseOrResponseException.responseException == null) { - listener.onSuccess(responseOrResponseException.response); - } else { - if (nodeTuple.nodes.hasNext()) { - listener.trackFailure(responseOrResponseException.responseException); - performRequestAsync(nodeTuple, request, listener); + request.cancellable.runIfNotCancelled(() -> { + final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); + client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback() { + @Override + public void completed(HttpResponse httpResponse) { + try { + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); + if (responseOrResponseException.responseException == null) { + listener.onSuccess(responseOrResponseException.response); } else { - listener.onDefinitiveFailure(responseOrResponseException.responseException); + if (nodeTuple.nodes.hasNext()) { + listener.trackFailure(responseOrResponseException.responseException); + performRequestAsync(nodeTuple, request, listener); + } else { + listener.onDefinitiveFailure(responseOrResponseException.responseException); + } } + } catch(Exception e) { + listener.onDefinitiveFailure(e); } - } catch(Exception e) { - listener.onDefinitiveFailure(e); } - } - @Override - public void failed(Exception failure) { - try { - RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure); - onFailure(context.node); - if (nodeTuple.nodes.hasNext()) { - listener.trackFailure(failure); - performRequestAsync(nodeTuple, request, listener); - } else { - listener.onDefinitiveFailure(failure); + @Override + public void failed(Exception failure) { + try { + RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure); + onFailure(context.node); + if (nodeTuple.nodes.hasNext()) { + listener.trackFailure(failure); + performRequestAsync(nodeTuple, request, listener); + } else { + listener.onDefinitiveFailure(failure); + } + } catch(Exception e) { + listener.onDefinitiveFailure(e); } - } catch(Exception e) { - listener.onDefinitiveFailure(e); } - } - @Override - public void cancelled() { - listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null)); - } + @Override + public void cancelled() { + listener.onDefinitiveFailure(Cancellable.newCancellationException()); + } + }); }); } @@ -651,19 +655,20 @@ public void remove() { private class InternalRequest { private final Request request; - private final Map params; private final Set ignoreErrorCodes; private final HttpRequestBase httpRequest; + private final Cancellable cancellable; private final WarningsHandler warningsHandler; InternalRequest(Request request) { this.request = request; - this.params = new HashMap<>(request.getParameters()); + Map params = new HashMap<>(request.getParameters()); //ignore is a special parameter supported by the clients, shouldn't be sent to es String ignoreString = params.remove("ignore"); this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod()); URI uri = buildUri(pathPrefix, request.getEndpoint(), params); this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity()); + this.cancellable = Cancellable.fromRequest(httpRequest); setHeaders(httpRequest, request.getOptions().getHeaders()); this.warningsHandler = request.getOptions().getWarningsHandler() == null ? RestClient.this.warningsHandler : request.getOptions().getWarningsHandler(); diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java index 4cc16c45bab2f..38317af790ae7 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -42,7 +43,9 @@ import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes; import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode; import static org.elasticsearch.client.RestClientTestUtil.randomOkStatusCode; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -52,6 +55,7 @@ */ public class RestClientMultipleHostsIntegTests extends RestClientTestCase { + private static WaitForCancelHandler waitForCancelHandler; private static HttpServer[] httpServers; private static HttpHost[] httpHosts; private static boolean stoppedFirstHost = false; @@ -94,9 +98,34 @@ private static HttpServer createHttpServer() throws Exception { for (int statusCode : getAllStatusCodes()) { httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode)); } + waitForCancelHandler = new WaitForCancelHandler(); + httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler); return httpServer; } + private static class WaitForCancelHandler implements HttpHandler { + private CountDownLatch cancelHandlerLatch; + + void reset() { + cancelHandlerLatch = new CountDownLatch(1); + } + + void cancelDone() { + cancelHandlerLatch.countDown(); + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + try { + cancelHandlerLatch.await(); + } catch (InterruptedException ignore) { + } finally { + exchange.sendResponseHeaders(200, 0); + exchange.close(); + } + } + } + private static class ResponseHandler implements HttpHandler { private final int statusCode; @@ -127,7 +156,7 @@ public void stopRandomHost() { //verify that shutting down some hosts doesn't matter as long as one working host is left behind if (httpServers.length > 1 && randomBoolean()) { List updatedHttpServers = new ArrayList<>(httpServers.length - 1); - int nodeIndex = randomInt(httpServers.length - 1); + int nodeIndex = randomIntBetween(0, httpServers.length - 1); if (0 == nodeIndex) { stoppedFirstHost = true; } @@ -139,7 +168,7 @@ public void stopRandomHost() { updatedHttpServers.add(httpServer); } } - httpServers = updatedHttpServers.toArray(new HttpServer[updatedHttpServers.size()]); + httpServers = updatedHttpServers.toArray(new HttpServer[0]); } } @@ -195,6 +224,40 @@ public void onFailure(Exception exception) { } } + public void testCancelAsyncRequests() throws Exception { + int numRequests = randomIntBetween(5, 20); + final CountDownLatch latch = new CountDownLatch(numRequests); + final List responses = new CopyOnWriteArrayList<>(); + final List exceptions = new CopyOnWriteArrayList<>(); + for (int i = 0; i < numRequests; i++) { + waitForCancelHandler.reset(); + final String method = RestClientTestUtil.randomHttpMethod(getRandom()); + //we don't test status codes that are subject to retries as they interfere with hosts being stopped + final int statusCode = randomBoolean() ? randomOkStatusCode(getRandom()) : randomErrorNoRetryStatusCode(getRandom()); + Cancellable cancellable = restClient.performRequestAsync(new Request(method, "/" + statusCode), new ResponseListener() { + @Override + public void onSuccess(Response response) { + responses.add(response); + latch.countDown(); + } + + @Override + public void onFailure(Exception exception) { + exceptions.add(exception); + latch.countDown(); + } + }); + cancellable.cancel(); + waitForCancelHandler.cancelDone(); + } + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertEquals(0, responses.size()); + assertEquals(numRequests, exceptions.size()); + for (Exception exception : exceptions) { + assertThat(exception, instanceOf(CancellationException.class)); + } + } + /** * Test host selector against a real server and * test what happens after calling @@ -249,13 +312,10 @@ Response getResponse() { } private NodeSelector firstPositionNodeSelector() { - return new NodeSelector() { - @Override - public void select(Iterable nodes) { - for (Iterator itr = nodes.iterator(); itr.hasNext();) { - if (httpHosts[0] != itr.next().getHost()) { - itr.remove(); - } + return nodes -> { + for (Iterator itr = nodes.iterator(); itr.hasNext();) { + if (httpHosts[0] != itr.next().getHost()) { + itr.remove(); } } }; diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java index f3df9bf3bfd37..21d4e9d0e81fe 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java @@ -243,19 +243,16 @@ public void testRoundRobinRetryErrors() throws Exception { } public void testNodeSelector() throws Exception { - NodeSelector firstPositionOnly = new NodeSelector() { - @Override - public void select(Iterable restClientNodes) { - boolean found = false; - for (Iterator itr = restClientNodes.iterator(); itr.hasNext();) { - if (nodes.get(0) == itr.next()) { - found = true; - } else { - itr.remove(); - } + NodeSelector firstPositionOnly = restClientNodes -> { + boolean found = false; + for (Iterator itr = restClientNodes.iterator(); itr.hasNext();) { + if (nodes.get(0) == itr.next()) { + found = true; + } else { + itr.remove(); } - assertTrue(found); } + assertTrue(found); }; RestClient restClient = createRestClient(firstPositionOnly); int rounds = between(1, 10); diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java index e3fd3c311378b..c47fa55c9fe67 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java @@ -26,11 +26,15 @@ import org.apache.http.Consts; import org.apache.http.Header; import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.TargetAuthenticationStrategy; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.message.BasicHeader; import org.apache.http.nio.entity.NStringEntity; @@ -49,16 +53,22 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes; import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods; +import static org.elasticsearch.client.RestClientTestUtil.randomHttpMethod; import static org.elasticsearch.client.RestClientTestUtil.randomStatusCode; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -73,6 +83,7 @@ public class RestClientSingleHostIntegTests extends RestClientTestCase { private RestClient restClient; private String pathPrefix; private Header[] defaultHeaders; + private WaitForCancelHandler waitForCancelHandler; @Before public void startHttpServer() throws Exception { @@ -89,9 +100,31 @@ private HttpServer createHttpServer() throws Exception { for (int statusCode : getAllStatusCodes()) { httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode)); } + waitForCancelHandler = new WaitForCancelHandler(); + httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler); return httpServer; } + private class WaitForCancelHandler implements HttpHandler { + + private final CountDownLatch cancelHandlerLatch = new CountDownLatch(1); + + void cancelDone() { + cancelHandlerLatch.countDown(); + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + try { + cancelHandlerLatch.await(); + } catch (InterruptedException ignore) { + } finally { + exchange.sendResponseHeaders(200, 0); + exchange.close(); + } + } + } + private static class ResponseHandler implements HttpHandler { private final int statusCode; @@ -201,6 +234,75 @@ public void onFailure(Exception exception) { } } + public void testCancelAsyncRequest() throws Exception { + Request request = new Request(randomHttpMethod(getRandom()), "/wait"); + CountDownLatch requestLatch = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + Cancellable cancellable = restClient.performRequestAsync(request, new ResponseListener() { + @Override + public void onSuccess(Response response) { + throw new AssertionError("onResponse called unexpectedly"); + } + + @Override + public void onFailure(Exception exception) { + error.set(exception); + requestLatch.countDown(); + } + }); + cancellable.cancel(); + waitForCancelHandler.cancelDone(); + assertTrue(requestLatch.await(5, TimeUnit.SECONDS)); + assertThat(error.get(), instanceOf(CancellationException.class)); + } + + /** + * This test verifies some assumptions that we rely upon around the way the async http client works when reusing the same request + * throughout multiple retries, and the use of the {@link HttpRequestBase#abort()} method. + */ + public void testRequestResetAndAbort() throws Exception { + try (CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().build()) { + client.start(); + HttpHost httpHost = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort()); + HttpGet httpGet = new HttpGet(pathPrefix + "/200"); + + //calling abort before the request is sent is a no-op + httpGet.abort(); + assertTrue(httpGet.isAborted()); + + { + httpGet.reset(); + assertFalse(httpGet.isAborted()); + httpGet.abort();//this has no effect on the next call (although isAborted will return true until the next reset) + Future future = client.execute(httpHost, httpGet, null); + assertEquals(200, future.get().getStatusLine().getStatusCode()); + assertFalse(future.isCancelled()); + } + { + httpGet.reset(); + Future future = client.execute(httpHost, httpGet, null); + assertFalse(httpGet.isAborted()); + httpGet.abort(); + assertTrue(httpGet.isAborted()); + try { + assertTrue(future.isCancelled()); + future.get(); + throw new AssertionError("exception should have been thrown"); + } catch(CancellationException e) { + //expected + } + } + { + httpGet.reset(); + assertFalse(httpGet.isAborted()); + Future future = client.execute(httpHost, httpGet, null); + assertFalse(httpGet.isAborted()); + assertEquals(200, future.get().getStatusLine().getStatusCode()); + assertFalse(future.isCancelled()); + } + } + } + /** * End to end test for headers. We test it explicitly against a real http client as there are different ways * to set/add headers to the {@link org.apache.http.client.HttpClient}. @@ -356,7 +458,6 @@ public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws Exception { assertThat(response200.getHeader("Authorization"), startsWith("Basic")); } } - } public void testUrlWithoutLeadingSlash() throws Exception { diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index 3894fca7d0a47..dd133f90daadb 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -52,7 +52,6 @@ import org.junit.After; import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import javax.net.ssl.SSLHandshakeException; @@ -68,7 +67,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -126,30 +124,24 @@ public void createRestClient() { static CloseableHttpAsyncClient mockHttpClient(final ExecutorService exec) { CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class); when(httpClient.execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class), - any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { - @Override - public Future answer(InvocationOnMock invocationOnMock) throws Throwable { + any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer((Answer>) invocationOnMock -> { final HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; final FutureCallback futureCallback = (FutureCallback) invocationOnMock.getArguments()[3]; // Call the callback asynchronous to better simulate how async http client works - return exec.submit(new Callable() { - @Override - public HttpResponse call() throws Exception { - if (futureCallback != null) { - try { - HttpResponse httpResponse = responseOrException(requestProducer); - futureCallback.completed(httpResponse); - } catch(Exception e) { - futureCallback.failed(e); - } - return null; + return exec.submit(() -> { + if (futureCallback != null) { + try { + HttpResponse httpResponse = responseOrException(requestProducer); + futureCallback.completed(httpResponse); + } catch(Exception e) { + futureCallback.failed(e); } - return responseOrException(requestProducer); + return null; } + return responseOrException(requestProducer); }); - } - }); + }); return httpClient; } diff --git a/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java b/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java index 8653db4226fe1..7ade990e1f811 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java +++ b/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java @@ -36,6 +36,7 @@ import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContexts; import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Cancellable; import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; import org.elasticsearch.client.Node; import org.elasticsearch.client.NodeSelector; @@ -206,16 +207,17 @@ public HttpAsyncClientBuilder customizeHttpClient( Request request = new Request( "GET", // <1> "/"); // <2> - restClient.performRequestAsync(request, new ResponseListener() { - @Override - public void onSuccess(Response response) { - // <3> - } + Cancellable cancellable = restClient.performRequestAsync(request, + new ResponseListener() { + @Override + public void onSuccess(Response response) { + // <3> + } - @Override - public void onFailure(Exception exception) { - // <4> - } + @Override + public void onFailure(Exception exception) { + // <4> + } }); //end::rest-client-async } @@ -271,6 +273,26 @@ public void onFailure(Exception exception) { latch.await(); //end::rest-client-async-example } + { + //tag::rest-client-async-cancel + Request request = new Request("GET", "/posts/_search"); + Cancellable cancellable = restClient.performRequestAsync( + request, + new ResponseListener() { + @Override + public void onSuccess(Response response) { + // <1> + } + + @Override + public void onFailure(Exception exception) { + // <2> + } + } + ); + cancellable.cancel(); + //end::rest-client-async-cancel + } { //tag::rest-client-response2 Response response = restClient.performRequest(new Request("GET", "/")); diff --git a/docs/java-rest/low-level/usage.asciidoc b/docs/java-rest/low-level/usage.asciidoc index 06bd77c7710dd..d0f4b070a55d6 100644 --- a/docs/java-rest/low-level/usage.asciidoc +++ b/docs/java-rest/low-level/usage.asciidoc @@ -224,7 +224,7 @@ Once the `RestClient` has been created, requests can be sent by calling either will block the calling thread and return the `Response` when the request is successful or throw an exception if it fails. `performRequestAsync` is asynchronous and accepts a `ResponseListener` argument that it calls with a -`Response` when the request is successful or with an `Exception` if it4 fails. +`Response` when the request is successful or with an `Exception` if it fails. This is synchronous: @@ -329,6 +329,22 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-async-examp <2> Handle the returned exception, due to communication error or a response with status code that indicates an error +==== Cancelling asynchronous requests + +The `performRequestAsync` method returns a `Cancellable` that exposes a single +public method called `cancel`. Such method can be called to cancel the on-going +request. Cancelling a request will result in aborting the http request through +the underlying http client. On the server side, this does not automatically +translate to the execution of that request being cancelled, which needs to be +specifically implemented in the API itself. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-async-cancel] +-------------------------------------------------- +<1> Process the returned response, in case it was ready before the request got cancelled +<2> Handle the returned exception, which will most likely be a `CancellationException` as the request got cancelled + [[java-rest-low-usage-responses]] === Reading responses