Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/88946.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 88946
summary: "Graph: fix race condition in timeout"
area: Graph
type: bug
issues:
- 55396
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,18 @@ public void testPopularityQueryCrawl() {
assertNull("Elvis is a 3rd tier connection so should not be returned here", response.getVertex(Vertex.createId("people", "elvis")));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55396")
public void testTimedoutQueryCrawl() {
GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test");
grb.setTimeout(TimeValue.timeValueMillis(400));
Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles"));
hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles
// 00s friends of beatles
grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1);
// A query that should cause a timeout
ScriptQueryBuilder timeoutQuery = QueryBuilders.scriptQuery(
new Script(ScriptType.INLINE, "mockscript", "graph_timeout", Collections.emptyMap())
);
grb.createNextHop(timeoutQuery).addVertexRequest("people").size(100).minDocCount(1);
// 00s friends of beatles
grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to move the script query causing the timeout to the hop before the last hop because we no longer check the timeout on the final response. If we get a full response from the query we return it even if we're above the timeout time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured that was fine because it should be quite fast.


GraphExploreResponse response = grb.get();
assertTrue(response.isTimedOut());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
package org.elasticsearch.xpack.graph.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PriorityQueue;
Expand Down Expand Up @@ -61,13 +63,13 @@
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Performs a series of elasticsearch queries and aggregations to explore
* connected terms in a single index.
*/
public class TransportGraphExploreAction extends HandledTransportAction<GraphExploreRequest, GraphExploreResponse> {
private static final Logger logger = LogManager.getLogger(TransportGraphExploreAction.class);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logger this one was using was deprecated.


private final ThreadPool threadPool;
private final NodeClient client;
Expand Down Expand Up @@ -115,7 +117,6 @@ class AsyncGraphAction {
private final ActionListener<GraphExploreResponse> listener;

private final long startTime;
private final AtomicBoolean timedOut;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was pretty unnecessary. We only ever set it one time so I just pass it in.

private volatile ShardOperationFailedException[] shardFailures;
private Map<VertexId, Vertex> vertices = new HashMap<>();
private Map<ConnectionId, Connection> connections = new HashMap<>();
Expand All @@ -128,7 +129,6 @@ class AsyncGraphAction {
this.request = request;
this.listener = listener;
this.startTime = threadPool.relativeTimeInMillis();
this.timedOut = new AtomicBoolean(false);
this.shardFailures = ShardSearchFailure.EMPTY_ARRAY;
}

Expand Down Expand Up @@ -173,16 +173,11 @@ private void removeVertex(Vertex vertex) {
* connections
*/
synchronized void expand() {
if (hasTimedOut()) {
timedOut.set(true);
listener.onResponse(buildResponse());
return;
}
Map<String, Set<Vertex>> lastHopFindings = hopFindings.get(currentHopNumber);
if ((currentHopNumber >= (request.getHopNumbers() - 1)) || (lastHopFindings == null) || (lastHopFindings.size() == 0)) {
// Either we gathered no leads from the last hop or we have
// reached the final hop
listener.onResponse(buildResponse());
listener.onResponse(buildResponse(false));
return;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the bit where we return the response even if we're over time. If we're done we may as well return anyway, I say.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for me

Hop lastHop = request.getHop(currentHopNumber);
Expand Down Expand Up @@ -318,16 +313,22 @@ synchronized void expand() {
// Execute the search
SearchSourceBuilder source = new SearchSourceBuilder().query(rootBool).aggregation(sampleAgg).size(0);
if (request.timeout() != null) {
source.timeout(TimeValue.timeValueMillis(timeRemainingMillis()));
// Actual resolution of timer is granularity of the interval
// configured globally for updating estimated time.
long timeRemainingMillis = startTime + request.timeout().millis() - threadPool.relativeTimeInMillis();
if (timeRemainingMillis <= 0) {
listener.onResponse(buildResponse(true));
return;
}

source.timeout(TimeValue.timeValueMillis(timeRemainingMillis));
}
searchRequest.source(source);

// System.out.println(source);
logger.trace("executing expansion graph search request");
client.search(searchRequest, new ActionListener.Delegating<>(listener) {
@Override
public void onResponse(SearchResponse searchResponse) {
// System.out.println(searchResponse);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These just seemed like leftovers. I figure we'd dump these into the trace logs if we want them.

addShardFailures(searchResponse.getShardFailures());

ArrayList<Connection> newConnections = new ArrayList<Connection>();
Expand Down Expand Up @@ -676,7 +677,6 @@ public synchronized void start() {
source.timeout(request.timeout());
}
searchRequest.source(source);
// System.out.println(source);
logger.trace("executing initial graph search request");
client.search(searchRequest, new ActionListener.Delegating<>(listener) {
@Override
Expand Down Expand Up @@ -774,16 +774,6 @@ private void addNormalizedBoosts(BoolQueryBuilder includesContainer, VertexReque
}
}

boolean hasTimedOut() {
return request.timeout() != null && (timeRemainingMillis() <= 0);
}

long timeRemainingMillis() {
// Actual resolution of timer is granularity of the interval
// configured globally for updating estimated time.
return (startTime + request.timeout().millis()) - threadPool.relativeTimeInMillis();
}

void addShardFailures(ShardOperationFailedException[] failures) {
if (CollectionUtils.isEmpty(failures) == false) {
ShardOperationFailedException[] duplicates = new ShardOperationFailedException[shardFailures.length + failures.length];
Expand All @@ -793,9 +783,9 @@ void addShardFailures(ShardOperationFailedException[] failures) {
}
}

protected GraphExploreResponse buildResponse() {
protected GraphExploreResponse buildResponse(boolean timedOut) {
long took = threadPool.relativeTimeInMillis() - startTime;
return new GraphExploreResponse(took, timedOut.get(), shardFailures, vertices, connections, request.returnDetailedInfo());
return new GraphExploreResponse(took, timedOut, shardFailures, vertices, connections, request.returnDetailedInfo());
}

}
Expand Down