From 0b52ec8d3fd39a9600142f9d5c042c1e4b67aa8f Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 29 Jul 2022 11:05:03 -0400 Subject: [PATCH 1/2] Graph: fix race condition in timeout Previously `graph` checked if the request timed out, then spent some time doing work, then passed the timeout on to the next request. Over and over again. It's quite possible that the response may not have timed out for the first check but would have timed out for the second check. This manifests as the timeout being sent to the next hop being a negative number of milliseconds. We don't allow this sort of thing. This fixes this by moving the timeout check to the same spot it is read for setting the timeout on the next request - we just check if its `> 0` to find the timeouts. This does keep the request running slightly longer after it's officially timed out - but it's just long enough to prepare the next layer of request. Usually microseconds. Which should be fine. Closes #55396 --- .../xpack/graph/test/GraphTests.java | 5 +-- .../action/TransportGraphExploreAction.java | 40 +++++++------------ 2 files changed, 17 insertions(+), 28 deletions(-) diff --git a/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java b/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java index 7623fffa777f5..e9178675bd1a6 100644 --- a/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java +++ b/x-pack/plugin/graph/src/internalClusterTest/java/org/elasticsearch/xpack/graph/test/GraphTests.java @@ -222,19 +222,18 @@ public void testPopularityQueryCrawl() { assertNull("Elvis is a 3rd tier connection so should not be returned here", response.getVertex(Vertex.createId("people", "elvis"))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55396") public void testTimedoutQueryCrawl() { GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); grb.setTimeout(TimeValue.timeValueMillis(400)); Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles - // 00s friends of beatles - grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1); // A query that should cause a timeout ScriptQueryBuilder timeoutQuery = QueryBuilders.scriptQuery( new Script(ScriptType.INLINE, "mockscript", "graph_timeout", Collections.emptyMap()) ); grb.createNextHop(timeoutQuery).addVertexRequest("people").size(100).minDocCount(1); + // 00s friends of beatles + grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1); GraphExploreResponse response = grb.get(); assertTrue(response.isTimedOut()); diff --git a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java index b50cd54bbbe40..93c28f63b0e8b 100644 --- a/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java +++ b/x-pack/plugin/graph/src/main/java/org/elasticsearch/xpack/graph/action/TransportGraphExploreAction.java @@ -6,6 +6,8 @@ */ package org.elasticsearch.xpack.graph.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PriorityQueue; @@ -61,13 +63,13 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; /** * Performs a series of elasticsearch queries and aggregations to explore * connected terms in a single index. */ public class TransportGraphExploreAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportGraphExploreAction.class); private final ThreadPool threadPool; private final NodeClient client; @@ -115,7 +117,6 @@ class AsyncGraphAction { private final ActionListener listener; private final long startTime; - private final AtomicBoolean timedOut; private volatile ShardOperationFailedException[] shardFailures; private Map vertices = new HashMap<>(); private Map connections = new HashMap<>(); @@ -128,7 +129,6 @@ class AsyncGraphAction { this.request = request; this.listener = listener; this.startTime = threadPool.relativeTimeInMillis(); - this.timedOut = new AtomicBoolean(false); this.shardFailures = ShardSearchFailure.EMPTY_ARRAY; } @@ -173,16 +173,11 @@ private void removeVertex(Vertex vertex) { * connections */ synchronized void expand() { - if (hasTimedOut()) { - timedOut.set(true); - listener.onResponse(buildResponse()); - return; - } Map> lastHopFindings = hopFindings.get(currentHopNumber); if ((currentHopNumber >= (request.getHopNumbers() - 1)) || (lastHopFindings == null) || (lastHopFindings.size() == 0)) { // Either we gathered no leads from the last hop or we have // reached the final hop - listener.onResponse(buildResponse()); + listener.onResponse(buildResponse(false)); return; } Hop lastHop = request.getHop(currentHopNumber); @@ -318,16 +313,22 @@ synchronized void expand() { // Execute the search SearchSourceBuilder source = new SearchSourceBuilder().query(rootBool).aggregation(sampleAgg).size(0); if (request.timeout() != null) { - source.timeout(TimeValue.timeValueMillis(timeRemainingMillis())); + // Actual resolution of timer is granularity of the interval + // configured globally for updating estimated time. + long timeRemainingMillis = startTime + request.timeout().millis() - threadPool.relativeTimeInMillis(); + if (timeRemainingMillis <= 0) { + listener.onResponse(buildResponse(true)); + return; + } + + source.timeout(TimeValue.timeValueMillis(timeRemainingMillis)); } searchRequest.source(source); - // System.out.println(source); logger.trace("executing expansion graph search request"); client.search(searchRequest, new ActionListener.Delegating<>(listener) { @Override public void onResponse(SearchResponse searchResponse) { - // System.out.println(searchResponse); addShardFailures(searchResponse.getShardFailures()); ArrayList newConnections = new ArrayList(); @@ -676,7 +677,6 @@ public synchronized void start() { source.timeout(request.timeout()); } searchRequest.source(source); - // System.out.println(source); logger.trace("executing initial graph search request"); client.search(searchRequest, new ActionListener.Delegating<>(listener) { @Override @@ -774,16 +774,6 @@ private void addNormalizedBoosts(BoolQueryBuilder includesContainer, VertexReque } } - boolean hasTimedOut() { - return request.timeout() != null && (timeRemainingMillis() <= 0); - } - - long timeRemainingMillis() { - // Actual resolution of timer is granularity of the interval - // configured globally for updating estimated time. - return (startTime + request.timeout().millis()) - threadPool.relativeTimeInMillis(); - } - void addShardFailures(ShardOperationFailedException[] failures) { if (CollectionUtils.isEmpty(failures) == false) { ShardOperationFailedException[] duplicates = new ShardOperationFailedException[shardFailures.length + failures.length]; @@ -793,9 +783,9 @@ void addShardFailures(ShardOperationFailedException[] failures) { } } - protected GraphExploreResponse buildResponse() { + protected GraphExploreResponse buildResponse(boolean timedOut) { long took = threadPool.relativeTimeInMillis() - startTime; - return new GraphExploreResponse(took, timedOut.get(), shardFailures, vertices, connections, request.returnDetailedInfo()); + return new GraphExploreResponse(took, timedOut, shardFailures, vertices, connections, request.returnDetailedInfo()); } } From 5c25b333a7e40e0d41a4d4eb8cf41521f17eb3b9 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Fri, 29 Jul 2022 10:58:10 -0400 Subject: [PATCH 2/2] Update docs/changelog/88946.yaml --- docs/changelog/88946.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/88946.yaml diff --git a/docs/changelog/88946.yaml b/docs/changelog/88946.yaml new file mode 100644 index 0000000000000..ae853f2e5ffa9 --- /dev/null +++ b/docs/changelog/88946.yaml @@ -0,0 +1,6 @@ +pr: 88946 +summary: "Graph: fix race condition in timeout" +area: Graph +type: bug +issues: + - 55396