Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class ListenerTimeouts {

Expand All @@ -41,9 +42,29 @@ public class ListenerTimeouts {
* @param listenerName name of the listener for timeout exception
* @return the wrapped listener that will timeout
*/
public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool, ActionListener<Response> listener,
public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool, ActionListener<Response> listener,
TimeValue timeout, String executor, String listenerName) {
TimeoutableListener<Response> wrappedListener = new TimeoutableListener<>(listener, timeout, listenerName);
return wrapWithTimeout(threadPool, timeout, executor, listener, (ignore) -> {
String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]";
listener.onFailure(new ElasticsearchTimeoutException(timeoutMessage));
});
}

/**
* Wraps a listener with a listener that can timeout. After the timeout period the
* onTimeout Runnable will be called.
*
* @param threadPool used to schedule the timeout
* @param timeout period before listener failed
* @param executor to use for scheduling timeout
* @param listener to that can timeout
* @param onTimeout consumer will be called and the resulting wrapper will be passed to it as a parameter
* @return the wrapped listener that will timeout
*/
public static <Response> ActionListener<Response> wrapWithTimeout(ThreadPool threadPool, TimeValue timeout, String executor,
ActionListener<Response> listener,
Consumer<ActionListener<Response>> onTimeout) {
TimeoutableListener<Response> wrappedListener = new TimeoutableListener<>(listener, onTimeout);
wrappedListener.cancellable = threadPool.schedule(wrappedListener, timeout, executor);
return wrappedListener;
}
Expand All @@ -52,14 +73,12 @@ private static class TimeoutableListener<Response> implements ActionListener<Res

private final AtomicBoolean isDone = new AtomicBoolean(false);
private final ActionListener<Response> delegate;
private final TimeValue timeout;
private final String listenerName;
private final Consumer<ActionListener<Response>> onTimeout;
private volatile Scheduler.ScheduledCancellable cancellable;

private TimeoutableListener(ActionListener<Response> delegate, TimeValue timeout, String listenerName) {
private TimeoutableListener(ActionListener<Response> delegate, Consumer<ActionListener<Response>> onTimeout) {
this.delegate = delegate;
this.timeout = timeout;
this.listenerName = listenerName;
this.onTimeout = onTimeout;
}

@Override
Expand All @@ -81,8 +100,7 @@ public void onFailure(Exception e) {
@Override
public void run() {
if (isDone.compareAndSet(false, true)) {
String timeoutMessage = "[" + listenerName + "]" + " timed out after [" + timeout + "]";
delegate.onFailure(new ElasticsearchTimeoutException(timeoutMessage));
onTimeout.accept(this);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Listener getSearchProgressActionListener() {
/**
* Update the expiration time of the (partial) response.
*/
@Override
public void setExpirationTime(long expirationTimeMillis) {
this.expirationTimeMillis = expirationTimeMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;

import java.util.List;
Expand All @@ -33,7 +34,7 @@ public String getName() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
GetAsyncSearchAction.Request get = new GetAsyncSearchAction.Request(request.param("id"));
GetAsyncResultRequest get = new GetAsyncResultRequest(request.param("id"));
if (request.hasParam("wait_for_completion_timeout")) {
get.setWaitForCompletion(request.paramAsTime("wait_for_completion_timeout", get.getWaitForCompletion()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
*/
package org.elasticsearch.xpack.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -19,23 +14,21 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncResultsService;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsyncSearchAction.Request, AsyncSearchResponse> {
private final Logger logger = LogManager.getLogger(TransportGetAsyncSearchAction.class);
private final ClusterService clusterService;
public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsyncResultRequest, AsyncSearchResponse> {
private final AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> resultsService;
private final TransportService transportService;
private final AsyncTaskIndexService<AsyncSearchResponse> store;

@Inject
public TransportGetAsyncSearchAction(TransportService transportService,
Expand All @@ -44,113 +37,23 @@ public TransportGetAsyncSearchAction(TransportService transportService,
NamedWriteableRegistry registry,
Client client,
ThreadPool threadPool) {
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncSearchAction.Request::new);
this.clusterService = clusterService;
super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new);
this.transportService = transportService;
this.store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService, threadPool.getThreadContext(), client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
AsyncTaskIndexService<AsyncSearchResponse> store = new AsyncTaskIndexService<>(AsyncSearch.INDEX, clusterService,
threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
resultsService = new AsyncResultsService<>(store, true, AsyncSearchTask.class, AsyncSearchTask::addCompletionListener,
transportService.getTaskManager(), clusterService);
}

@Override
protected void doExecute(Task task, GetAsyncSearchAction.Request request, ActionListener<AsyncSearchResponse> listener) {
try {
long nowInMillis = System.currentTimeMillis();
AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
if (clusterService.localNode().getId().equals(searchId.getTaskId().getNodeId()) || node == null) {
if (request.getKeepAlive().getMillis() > 0) {
long expirationTime = nowInMillis + request.getKeepAlive().getMillis();
store.updateExpirationTime(searchId.getDocId(), expirationTime,
ActionListener.wrap(
p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
exc -> {
//don't log when: the async search document or its index is not found. That can happen if an invalid
//search id is provided or no async search initial response has been stored yet.
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
if (status != RestStatus.NOT_FOUND) {
logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]",
searchId.getEncoded()), exc);
}
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
}
));
} else {
getSearchResponseFromTask(searchId, request, nowInMillis, -1, listener);
}
} else {
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
transportService.sendRequest(node, GetAsyncSearchAction.NAME, request, builder.build(),
new ActionListenerResponseHandler<>(listener, AsyncSearchResponse::new, ThreadPool.Names.SAME));
}
} catch (Exception exc) {
listener.onFailure(exc);
protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<AsyncSearchResponse> listener) {
DiscoveryNode node = resultsService.getNode(request.getId());
if (node == null || resultsService.isLocalNode(node)) {
resultsService.retrieveResult(request, listener);
} else {
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
transportService.sendRequest(node, GetAsyncSearchAction.NAME, request, builder.build(),
new ActionListenerResponseHandler<>(listener, AsyncSearchResponse::new, ThreadPool.Names.SAME));
}
}

private void getSearchResponseFromTask(AsyncExecutionId searchId,
GetAsyncSearchAction.Request request,
long nowInMillis,
long expirationTimeMillis,
ActionListener<AsyncSearchResponse> listener) {
try {
final AsyncSearchTask task = store.getTask(taskManager, searchId, AsyncSearchTask.class);
if (task == null) {
getSearchResponseFromIndex(searchId, request, nowInMillis, listener);
return;
}

if (task.isCancelled()) {
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
return;
}

if (expirationTimeMillis != -1) {
task.setExpirationTime(expirationTimeMillis);
}
task.addCompletionListener(new ActionListener<>() {
@Override
public void onResponse(AsyncSearchResponse response) {
sendFinalResponse(request, response, nowInMillis, listener);
}

@Override
public void onFailure(Exception exc) {
listener.onFailure(exc);
}
}, request.getWaitForCompletion());
} catch (Exception exc) {
listener.onFailure(exc);
}
}

private void getSearchResponseFromIndex(AsyncExecutionId searchId,
GetAsyncSearchAction.Request request,
long nowInMillis,
ActionListener<AsyncSearchResponse> listener) {
store.getResponse(searchId, true,
new ActionListener<>() {
@Override
public void onResponse(AsyncSearchResponse response) {
sendFinalResponse(request, response, nowInMillis, listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}

private void sendFinalResponse(GetAsyncSearchAction.Request request,
AsyncSearchResponse response,
long nowInMillis,
ActionListener<AsyncSearchResponse> listener) {
// check if the result has expired
if (response.getExpirationTime() < nowInMillis) {
listener.onFailure(new ResourceNotFoundException(request.getId()));
return;
}

listener.onResponse(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
Expand Down Expand Up @@ -135,11 +136,11 @@ protected AsyncSearchResponse submitAsyncSearch(SubmitAsyncSearchRequest request
}

protected AsyncSearchResponse getAsyncSearch(String id) throws ExecutionException, InterruptedException {
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id)).get();
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id)).get();
}

protected AsyncSearchResponse getAsyncSearch(String id, TimeValue keepAlive) throws ExecutionException, InterruptedException {
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncSearchAction.Request(id).setKeepAlive(keepAlive)).get();
return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id).setKeepAlive(keepAlive)).get();
}

protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
Expand Down Expand Up @@ -239,7 +240,7 @@ private AsyncSearchResponse doNext() throws Exception {
}
queryLatch.countDownAndReset();
AsyncSearchResponse newResponse = client().execute(GetAsyncSearchAction.INSTANCE,
new GetAsyncSearchAction.Request(response.getId())
new GetAsyncResultRequest(response.getId())
.setWaitForCompletion(TimeValue.timeValueMillis(10))).get();

if (newResponse.isRunning()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.List;

import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.search.GetAsyncSearchRequestTests.randomSearchId;
import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId;

public class AsyncSearchResponseTests extends ESTestCase {
private SearchResponse searchResponse = randomSearchResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.search.action.DeleteAsyncSearchAction;

import static org.elasticsearch.xpack.search.GetAsyncSearchRequestTests.randomSearchId;
import static org.elasticsearch.xpack.core.async.GetAsyncResultRequestTests.randomSearchId;

public class DeleteAsyncSearchRequestTests extends AbstractWireSerializingTestCase<DeleteAsyncSearchAction.Request> {
@Override
Expand Down

This file was deleted.

Loading