66 */
77package org .elasticsearch .xpack .graph .action ;
88
9+ import org .apache .logging .log4j .LogManager ;
10+ import org .apache .logging .log4j .Logger ;
911import org .apache .lucene .search .BooleanQuery ;
1012import org .apache .lucene .util .BytesRef ;
1113import org .apache .lucene .util .PriorityQueue ;
6163import java .util .Set ;
6264import java .util .SortedSet ;
6365import java .util .TreeSet ;
64- import java .util .concurrent .atomic .AtomicBoolean ;
6566
6667/**
6768 * Performs a series of elasticsearch queries and aggregations to explore
6869 * connected terms in a single index.
6970 */
7071public class TransportGraphExploreAction extends HandledTransportAction <GraphExploreRequest , GraphExploreResponse > {
72+ private static final Logger logger = LogManager .getLogger (TransportGraphExploreAction .class );
7173
7274 private final ThreadPool threadPool ;
7375 private final NodeClient client ;
@@ -115,7 +117,6 @@ class AsyncGraphAction {
115117 private final ActionListener <GraphExploreResponse > listener ;
116118
117119 private final long startTime ;
118- private final AtomicBoolean timedOut ;
119120 private volatile ShardOperationFailedException [] shardFailures ;
120121 private Map <VertexId , Vertex > vertices = new HashMap <>();
121122 private Map <ConnectionId , Connection > connections = new HashMap <>();
@@ -128,7 +129,6 @@ class AsyncGraphAction {
128129 this .request = request ;
129130 this .listener = listener ;
130131 this .startTime = threadPool .relativeTimeInMillis ();
131- this .timedOut = new AtomicBoolean (false );
132132 this .shardFailures = ShardSearchFailure .EMPTY_ARRAY ;
133133 }
134134
@@ -173,16 +173,11 @@ private void removeVertex(Vertex vertex) {
173173 * connections
174174 */
175175 synchronized void expand () {
176- if (hasTimedOut ()) {
177- timedOut .set (true );
178- listener .onResponse (buildResponse ());
179- return ;
180- }
181176 Map <String , Set <Vertex >> lastHopFindings = hopFindings .get (currentHopNumber );
182177 if ((currentHopNumber >= (request .getHopNumbers () - 1 )) || (lastHopFindings == null ) || (lastHopFindings .size () == 0 )) {
183178 // Either we gathered no leads from the last hop or we have
184179 // reached the final hop
185- listener .onResponse (buildResponse ());
180+ listener .onResponse (buildResponse (false ));
186181 return ;
187182 }
188183 Hop lastHop = request .getHop (currentHopNumber );
@@ -318,16 +313,22 @@ synchronized void expand() {
318313 // Execute the search
319314 SearchSourceBuilder source = new SearchSourceBuilder ().query (rootBool ).aggregation (sampleAgg ).size (0 );
320315 if (request .timeout () != null ) {
321- source .timeout (TimeValue .timeValueMillis (timeRemainingMillis ()));
316+ // Actual resolution of timer is granularity of the interval
317+ // configured globally for updating estimated time.
318+ long timeRemainingMillis = startTime + request .timeout ().millis () - threadPool .relativeTimeInMillis ();
319+ if (timeRemainingMillis <= 0 ) {
320+ listener .onResponse (buildResponse (true ));
321+ return ;
322+ }
323+
324+ source .timeout (TimeValue .timeValueMillis (timeRemainingMillis ));
322325 }
323326 searchRequest .source (source );
324327
325- // System.out.println(source);
326328 logger .trace ("executing expansion graph search request" );
327329 client .search (searchRequest , new ActionListener .Delegating <>(listener ) {
328330 @ Override
329331 public void onResponse (SearchResponse searchResponse ) {
330- // System.out.println(searchResponse);
331332 addShardFailures (searchResponse .getShardFailures ());
332333
333334 ArrayList <Connection > newConnections = new ArrayList <Connection >();
@@ -676,7 +677,6 @@ public synchronized void start() {
676677 source .timeout (request .timeout ());
677678 }
678679 searchRequest .source (source );
679- // System.out.println(source);
680680 logger .trace ("executing initial graph search request" );
681681 client .search (searchRequest , new ActionListener .Delegating <>(listener ) {
682682 @ Override
@@ -774,16 +774,6 @@ private void addNormalizedBoosts(BoolQueryBuilder includesContainer, VertexReque
774774 }
775775 }
776776
777- boolean hasTimedOut () {
778- return request .timeout () != null && (timeRemainingMillis () <= 0 );
779- }
780-
781- long timeRemainingMillis () {
782- // Actual resolution of timer is granularity of the interval
783- // configured globally for updating estimated time.
784- return (startTime + request .timeout ().millis ()) - threadPool .relativeTimeInMillis ();
785- }
786-
787777 void addShardFailures (ShardOperationFailedException [] failures ) {
788778 if (CollectionUtils .isEmpty (failures ) == false ) {
789779 ShardOperationFailedException [] duplicates = new ShardOperationFailedException [shardFailures .length + failures .length ];
@@ -793,9 +783,9 @@ void addShardFailures(ShardOperationFailedException[] failures) {
793783 }
794784 }
795785
796- protected GraphExploreResponse buildResponse () {
786+ protected GraphExploreResponse buildResponse (boolean timedOut ) {
797787 long took = threadPool .relativeTimeInMillis () - startTime ;
798- return new GraphExploreResponse (took , timedOut . get () , shardFailures , vertices , connections , request .returnDetailedInfo ());
788+ return new GraphExploreResponse (took , timedOut , shardFailures , vertices , connections , request .returnDetailedInfo ());
799789 }
800790
801791 }
0 commit comments