diff --git a/docs/changelog/88946.yaml b/docs/changelog/88946.yaml new file mode 100644 index 0000000000000..ae853f2e5ffa9 --- /dev/null +++ b/docs/changelog/88946.yaml @@ -0,0 +1,6 @@ +pr: 88946 +summary: "Graph: fix race condition in timeout" +area: Graph +type: bug +issues: + - 55396 diff --git a/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java b/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java index 7623fffa777f5..e9178675bd1a6 100644 --- a/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java +++ b/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java @@ -222,19 +222,18 @@ public void testPopularityQueryCrawl() { assertNull("Elvis is a 3rd tier connection so should not be returned here", response.getVertex(Vertex.createId("people", "elvis"))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55396") public void testTimedoutQueryCrawl() { GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); grb.setTimeout(TimeValue.timeValueMillis(400)); Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles - // 00s friends of beatles - grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1); // A query that should cause a timeout ScriptQueryBuilder timeoutQuery = QueryBuilders.scriptQuery( new Script(ScriptType.INLINE, "mockscript", "graph_timeout", Collections.emptyMap()) ); grb.createNextHop(timeoutQuery).addVertexRequest("people").size(100).minDocCount(1); + // 00s friends of beatles + grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1); GraphExploreResponse response = grb.get(); assertTrue(response.isTimedOut()); diff --git a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java index b50cd54bbbe40..93c28f63b0e8b 100644 --- a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java +++ b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java @@ -6,6 +6,8 @@ */ package org.elasticsearch.xpack.graph.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PriorityQueue; @@ -61,13 +63,13 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; /** * Performs a series of elasticsearch queries and aggregations to explore * connected terms in a single index. */ public class TransportGraphExploreAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportGraphExploreAction.class); private final ThreadPool threadPool; private final NodeClient client; @@ -115,7 +117,6 @@ class AsyncGraphAction { private final ActionListener listener; private final long startTime; - private final AtomicBoolean timedOut; private volatile ShardOperationFailedException[] shardFailures; private Map vertices = new HashMap<>(); private Map connections = new HashMap<>(); @@ -128,7 +129,6 @@ class AsyncGraphAction { this.request = request; this.listener = listener; this.startTime = threadPool.relativeTimeInMillis(); - this.timedOut = new AtomicBoolean(false); this.shardFailures = ShardSearchFailure.EMPTY_ARRAY; } @@ -173,16 +173,11 @@ private void removeVertex(Vertex vertex) { * connections */ synchronized void expand() { - if (hasTimedOut()) { - timedOut.set(true); - listener.onResponse(buildResponse()); - return; - } Map> lastHopFindings = hopFindings.get(currentHopNumber); if ((currentHopNumber >= (request.getHopNumbers() - 1)) || (lastHopFindings == null) || (lastHopFindings.size() == 0)) { // Either we gathered no leads from the last hop or we have // reached the final hop - listener.onResponse(buildResponse()); + listener.onResponse(buildResponse(false)); return; } Hop lastHop = request.getHop(currentHopNumber); @@ -318,16 +313,22 @@ synchronized void expand() { // Execute the search SearchSourceBuilder source = new SearchSourceBuilder().query(rootBool).aggregation(sampleAgg).size(0); if (request.timeout() != null) { - source.timeout(TimeValue.timeValueMillis(timeRemainingMillis())); + // Actual resolution of timer is granularity of the interval + // configured globally for updating estimated time. + long timeRemainingMillis = startTime + request.timeout().millis() - threadPool.relativeTimeInMillis(); + if (timeRemainingMillis <= 0) { + listener.onResponse(buildResponse(true)); + return; + } + + source.timeout(TimeValue.timeValueMillis(timeRemainingMillis)); } searchRequest.source(source); - // System.out.println(source); logger.trace("executing expansion graph search request"); client.search(searchRequest, new ActionListener.Delegating<>(listener) { @Override public void onResponse(SearchResponse searchResponse) { - // System.out.println(searchResponse); addShardFailures(searchResponse.getShardFailures()); ArrayList newConnections = new ArrayList(); @@ -676,7 +677,6 @@ public synchronized void start() { source.timeout(request.timeout()); } searchRequest.source(source); - // System.out.println(source); logger.trace("executing initial graph search request"); client.search(searchRequest, new ActionListener.Delegating<>(listener) { @Override @@ -774,16 +774,6 @@ private void addNormalizedBoosts(BoolQueryBuilder includesContainer, VertexReque } } - boolean hasTimedOut() { - return request.timeout() != null && (timeRemainingMillis() <= 0); - } - - long timeRemainingMillis() { - // Actual resolution of timer is granularity of the interval - // configured globally for updating estimated time. - return (startTime + request.timeout().millis()) - threadPool.relativeTimeInMillis(); - } - void addShardFailures(ShardOperationFailedException[] failures) { if (CollectionUtils.isEmpty(failures) == false) { ShardOperationFailedException[] duplicates = new ShardOperationFailedException[shardFailures.length + failures.length]; @@ -793,9 +783,9 @@ void addShardFailures(ShardOperationFailedException[] failures) { } } - protected GraphExploreResponse buildResponse() { + protected GraphExploreResponse buildResponse(boolean timedOut) { long took = threadPool.relativeTimeInMillis() - startTime; - return new GraphExploreResponse(took, timedOut.get(), shardFailures, vertices, connections, request.returnDetailedInfo()); + return new GraphExploreResponse(took, timedOut, shardFailures, vertices, connections, request.returnDetailedInfo()); } }