Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;

import org.apache.http.client.methods.HttpRequestBase;

import java.util.concurrent.CancellationException;

/**
* Represents an operation that can be cancelled.
* Returned when executing async requests through {@link RestClient#performRequestAsync(Request, ResponseListener)}, so that the request
* can be cancelled if needed.
*/
public class Cancellable {

static final Cancellable NO_OP = new Cancellable(null) {
@Override
public synchronized void cancel() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the use of synchronized here seems a bit redundant since one method is empty and the other rethrows an exception

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, it's a leftover.

}

@Override
synchronized void runIfNotCancelled(Runnable runnable) {
throw new UnsupportedOperationException();
}
};

static Cancellable fromRequest(HttpRequestBase httpRequest) {
return new Cancellable(httpRequest);
}

private final HttpRequestBase httpRequest;

private Cancellable(HttpRequestBase httpRequest) {
this.httpRequest = httpRequest;
}

/**
* Cancels the on-going request that is associated with the current instance of {@link Cancellable}.
*/
public synchronized void cancel() {
this.httpRequest.abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe check here if the request is cancelled? Also does this need to be synchronized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I need to check if it's already cancelled. That check is already performed by the client, see AbstractExecutionAwareRequest#abort, hence the abort will do something only the first time it's called (unless the request gets reset).

It is synchronized to make sure that a request does not get cancelled when we are in between one attempt and the following one, in which case, if we have already reset the request but we have not yet called execute, the abort will do nothing as the cancellable ref is not set internally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, probably not much that can go wrong here. Worst case you probably get some kind of exception from apache httpclient. Thanks for picking this up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is that as a user you would call abort, which would do nothing and you don't get feedback on that. And any subsequent abort call would still do nothing as the request is marked aborted although calling abort did nothing. With the synchronized blocks we make sure that abort is only called when a cancellable ref is set internally, hence abort does something and is reliable Thank you for opening the issue and having a look at this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clear. In any case good enough for my use case, which would be having this called from a suspendCancellableCoRoutine. This will work beautifully for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Cancellable could be a core interface exposing just the cancel (and something more.... see below).
Also Cancellable is an apache class org.apache.http.concurrent.Cancellable, with this signature, so it would be better to call this class Abortable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer Cancellable over Abortable, and the package will differ so it should be ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Abortable is a terrible name indeed :-) .
Cancellable sounds more like an interface, what about CancellationHandler ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need a new interface, a Runnable would be enough since the user can only trigger one action (cancel/abort). Something like:

Runnable cancel = client. performRequestAsync(...)
...
cancel.run();

The javadocs need to explain what cancellation means anyway so I don't think that a new interface adds anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am slightly in favour of having a specific class name, just because it's a publicly exposed API and Runnable is a bit too generic. I would not add a new interface (keep what we have as a class), as it would have a single impl and that is redundant. I don't have a strong opinion though. Curious to hear from others.

}

/**
* Executes some arbitrary code iff the on-going request has not been cancelled, otherwise it throws {@link CancellationException}.
* This is needed to guarantee that cancelling a request works correctly even in case {@link #cancel()} is called between different
* attempts of the same request. If the request has already been cancelled we don't go ahead, otherwise we run the provided
* {@link Runnable} which will reset the request and send the next attempt.
*/
synchronized void runIfNotCancelled(Runnable runnable) {
if (this.httpRequest.isAborted()) {
throw new CancellationException("request was cancelled");
}
runnable.run();
}
}
78 changes: 42 additions & 36 deletions client/rest/src/main/java/org/elasticsearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -277,60 +278,64 @@ private ResponseOrResponseException convertResponse(InternalRequest request, Nod
* @param responseListener the {@link ResponseListener} to notify when the
* request is completed or fails
*/
public void performRequestAsync(Request request, ResponseListener responseListener) {
public Cancellable performRequestAsync(Request request, ResponseListener responseListener) {
try {
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
InternalRequest internalRequest = new InternalRequest(request);
performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener);
return internalRequest.cancellable;
} catch (Exception e) {
responseListener.onFailure(e);
return Cancellable.NO_OP;
}
}

private void performRequestAsync(final NodeTuple<Iterator<Node>> nodeTuple,
final InternalRequest request,
final FailureTrackingResponseListener listener) {
final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
if (responseOrResponseException.responseException == null) {
listener.onSuccess(responseOrResponseException.response);
} else {
if (nodeTuple.nodes.hasNext()) {
listener.trackFailure(responseOrResponseException.responseException);
performRequestAsync(nodeTuple, request, listener);
request.cancellable.runIfNotCancelled(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This provided runnable is executed synchronously so I don't see how the cancellable can already be cancelled ?
In other word, when is this check needed ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need a Runnable
because

client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, ...)

already returns a Future, thus it is a non blocking operation. ( It is what I exploited in the solution that I sketched up ).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the synchronized bit is important here. one can try to cancel when we are in between different attempts of the same request: internally, there is nothing to cancel hence abort will do nothing in that case. I want to make sure that calling cancel with our client gives some guarantee that it will have some effect.

final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
if (responseOrResponseException.responseException == null) {
listener.onSuccess(responseOrResponseException.response);
} else {
listener.onDefinitiveFailure(responseOrResponseException.responseException);
if (nodeTuple.nodes.hasNext()) {
listener.trackFailure(responseOrResponseException.responseException);
performRequestAsync(nodeTuple, request, listener);
} else {
listener.onDefinitiveFailure(responseOrResponseException.responseException);
}
}
} catch(Exception e) {
listener.onDefinitiveFailure(e);
}
} catch(Exception e) {
listener.onDefinitiveFailure(e);
}
}

@Override
public void failed(Exception failure) {
try {
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure);
onFailure(context.node);
if (nodeTuple.nodes.hasNext()) {
listener.trackFailure(failure);
performRequestAsync(nodeTuple, request, listener);
} else {
listener.onDefinitiveFailure(failure);
@Override
public void failed(Exception failure) {
try {
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure);
onFailure(context.node);
if (nodeTuple.nodes.hasNext()) {
listener.trackFailure(failure);
performRequestAsync(nodeTuple, request, listener);
} else {
listener.onDefinitiveFailure(failure);
}
} catch(Exception e) {
listener.onDefinitiveFailure(e);
}
} catch(Exception e) {
listener.onDefinitiveFailure(e);
}
}

@Override
public void cancelled() {
listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null));
}
@Override
public void cancelled() {
listener.onDefinitiveFailure(new CancellationException("request was cancelled"));
}
});
});
}

Expand Down Expand Up @@ -651,19 +656,20 @@ public void remove() {

private class InternalRequest {
private final Request request;
private final Map<String, String> params;
private final Set<Integer> ignoreErrorCodes;
private final HttpRequestBase httpRequest;
private final Cancellable cancellable;
private final WarningsHandler warningsHandler;

InternalRequest(Request request) {
this.request = request;
this.params = new HashMap<>(request.getParameters());
Map<String, String> params = new HashMap<>(request.getParameters());
//ignore is a special parameter supported by the clients, shouldn't be sent to es
String ignoreString = params.remove("ignore");
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());
this.cancellable = Cancellable.fromRequest(httpRequest);
setHeaders(httpRequest, request.getOptions().getHeaders());
this.warningsHandler = request.getOptions().getWarningsHandler() == null ?
RestClient.this.warningsHandler : request.getOptions().getWarningsHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
import static org.elasticsearch.client.RestClientTestUtil.randomOkStatusCode;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -52,6 +55,7 @@
*/
public class RestClientMultipleHostsIntegTests extends RestClientTestCase {

private static WaitForCancelHandler waitForCancelHandler;
private static HttpServer[] httpServers;
private static HttpHost[] httpHosts;
private static boolean stoppedFirstHost = false;
Expand Down Expand Up @@ -94,9 +98,34 @@ private static HttpServer createHttpServer() throws Exception {
for (int statusCode : getAllStatusCodes()) {
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
}
waitForCancelHandler = new WaitForCancelHandler();
httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler);
return httpServer;
}

private static class WaitForCancelHandler implements HttpHandler {
private CountDownLatch cancelHandlerLatch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be final

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it gets reset before each request in testCancelAsyncRequests, so it can't be final I think


void reset() {
cancelHandlerLatch = new CountDownLatch(1);
}

void cancelDone() {
cancelHandlerLatch.countDown();
}

@Override
public void handle(HttpExchange exchange) throws IOException {
try {
cancelHandlerLatch.await();
} catch (InterruptedException ignore) {
} finally {
exchange.sendResponseHeaders(200, 0);
exchange.close();
}
}
}

private static class ResponseHandler implements HttpHandler {
private final int statusCode;

Expand Down Expand Up @@ -127,7 +156,7 @@ public void stopRandomHost() {
//verify that shutting down some hosts doesn't matter as long as one working host is left behind
if (httpServers.length > 1 && randomBoolean()) {
List<HttpServer> updatedHttpServers = new ArrayList<>(httpServers.length - 1);
int nodeIndex = randomInt(httpServers.length - 1);
int nodeIndex = randomIntBetween(0, httpServers.length - 1);
if (0 == nodeIndex) {
stoppedFirstHost = true;
}
Expand All @@ -139,7 +168,7 @@ public void stopRandomHost() {
updatedHttpServers.add(httpServer);
}
}
httpServers = updatedHttpServers.toArray(new HttpServer[updatedHttpServers.size()]);
httpServers = updatedHttpServers.toArray(new HttpServer[0]);
}
}

Expand Down Expand Up @@ -195,6 +224,40 @@ public void onFailure(Exception exception) {
}
}

public void testCancelAsyncRequests() throws Exception {
int numRequests = randomIntBetween(5, 20);
final CountDownLatch latch = new CountDownLatch(numRequests);
final List<Response> responses = new CopyOnWriteArrayList<>();
final List<Exception> exceptions = new CopyOnWriteArrayList<>();
for (int i = 0; i < numRequests; i++) {
waitForCancelHandler.reset();
final String method = RestClientTestUtil.randomHttpMethod(getRandom());
//we don't test status codes that are subject to retries as they interfere with hosts being stopped
final int statusCode = randomBoolean() ? randomOkStatusCode(getRandom()) : randomErrorNoRetryStatusCode(getRandom());
Cancellable cancellable = restClient.performRequestAsync(new Request(method, "/" + statusCode), new ResponseListener() {
@Override
public void onSuccess(Response response) {
responses.add(response);
latch.countDown();
}

@Override
public void onFailure(Exception exception) {
exceptions.add(exception);
latch.countDown();
}
});
cancellable.cancel();
waitForCancelHandler.cancelDone();
}
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(0, responses.size());
assertEquals(numRequests, exceptions.size());
for (Exception exception : exceptions) {
assertThat(exception, instanceOf(CancellationException.class));
}
}

/**
* Test host selector against a real server <strong>and</strong>
* test what happens after calling
Expand Down Expand Up @@ -249,13 +312,10 @@ Response getResponse() {
}

private NodeSelector firstPositionNodeSelector() {
return new NodeSelector() {
@Override
public void select(Iterable<Node> nodes) {
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
if (httpHosts[0] != itr.next().getHost()) {
itr.remove();
}
return nodes -> {
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
if (httpHosts[0] != itr.next().getHost()) {
itr.remove();
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,16 @@ public void testRoundRobinRetryErrors() throws Exception {
}

public void testNodeSelector() throws Exception {
NodeSelector firstPositionOnly = new NodeSelector() {
@Override
public void select(Iterable<Node> restClientNodes) {
boolean found = false;
for (Iterator<Node> itr = restClientNodes.iterator(); itr.hasNext();) {
if (nodes.get(0) == itr.next()) {
found = true;
} else {
itr.remove();
}
NodeSelector firstPositionOnly = restClientNodes -> {
boolean found = false;
for (Iterator<Node> itr = restClientNodes.iterator(); itr.hasNext();) {
if (nodes.get(0) == itr.next()) {
found = true;
} else {
itr.remove();
}
assertTrue(found);
}
assertTrue(found);
};
RestClient restClient = createRestClient(firstPositionOnly);
int rounds = between(1, 10);
Expand Down
Loading