1414import org .elasticsearch .common .logging .Loggers ;
1515import org .elasticsearch .common .unit .ByteSizeUnit ;
1616import org .elasticsearch .common .unit .ByteSizeValue ;
17+ import org .elasticsearch .common .unit .TimeValue ;
18+ import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
19+ import org .elasticsearch .common .util .concurrent .FutureUtils ;
1720import org .elasticsearch .xpack .core .ml .MachineLearningField ;
1821import org .elasticsearch .xpack .core .ml .action .PutJobAction ;
1922import org .elasticsearch .xpack .core .ml .action .UpdateJobAction ;
3033import org .elasticsearch .xpack .core .ml .job .results .ForecastRequestStats ;
3134import org .elasticsearch .xpack .core .ml .job .results .Influencer ;
3235import org .elasticsearch .xpack .core .ml .job .results .ModelPlot ;
36+ import org .elasticsearch .xpack .ml .MachineLearning ;
3337import org .elasticsearch .xpack .ml .job .persistence .JobProvider ;
3438import org .elasticsearch .xpack .ml .job .persistence .JobResultsPersister ;
3539import org .elasticsearch .xpack .ml .job .process .autodetect .AutodetectProcess ;
4347import java .util .List ;
4448import java .util .Objects ;
4549import java .util .concurrent .CountDownLatch ;
50+ import java .util .concurrent .Future ;
4651import java .util .concurrent .Semaphore ;
4752import java .util .concurrent .TimeUnit ;
4853import java .util .concurrent .TimeoutException ;
@@ -71,6 +76,13 @@ public class AutoDetectResultProcessor {
7176
7277 private static final Logger LOGGER = Loggers .getLogger (AutoDetectResultProcessor .class );
7378
79+ /**
80+ * This is how far behind real-time we'll update the job with the latest established model memory.
81+ * If more updates are received during the delay period then they'll take precedence.
82+ * As a result there will be at most one update of established model memory per delay period.
83+ */
84+ private static final TimeValue ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY = TimeValue .timeValueSeconds (5 );
85+
7486 private final Client client ;
7587 private final Auditor auditor ;
7688 private final String jobId ;
@@ -90,8 +102,10 @@ public class AutoDetectResultProcessor {
90102 * New model size stats are read as the process is running
91103 */
92104 private volatile ModelSizeStats latestModelSizeStats ;
105+ private volatile Date latestDateForEstablishedModelMemoryCalc ;
93106 private volatile long latestEstablishedModelMemory ;
94107 private volatile boolean haveNewLatestModelSizeStats ;
108+ private Future <?> scheduledEstablishedModelMemoryUpdate ; // only accessed in synchronized methods
95109
96110 public AutoDetectResultProcessor (Client client , Auditor auditor , String jobId , Renormalizer renormalizer , JobResultsPersister persister ,
97111 JobProvider jobProvider , ModelSizeStats latestModelSizeStats , boolean restoredSnapshot ) {
@@ -148,6 +162,7 @@ public void process(AutodetectProcess process) {
148162 }
149163
150164 LOGGER .info ("[{}] {} buckets parsed from autodetect output" , jobId , bucketCount );
165+ runEstablishedModelMemoryUpdate (true );
151166 } catch (Exception e ) {
152167 failed = true ;
153168
@@ -194,15 +209,15 @@ void processResult(Context context, AutodetectResult result) {
194209 // persist after deleting interim results in case the new
195210 // results are also interim
196211 context .bulkResultsPersister .persistBucket (bucket ).executeRequest ();
212+ latestDateForEstablishedModelMemoryCalc = bucket .getTimestamp ();
197213 ++bucketCount ;
198214
199215 // if we haven't previously set established model memory, consider trying again after
200- // a reasonable amount of time has elapsed since the last model size stats update
216+ // a reasonable number of buckets have elapsed since the last model size stats update
201217 long minEstablishedTimespanMs = JobProvider .BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket .getBucketSpan () * 1000L ;
202- if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0
203- && bucket .getTimestamp ().getTime () > latestModelSizeStats .getTimestamp ().getTime () + minEstablishedTimespanMs ) {
204- persister .commitResultWrites (context .jobId );
205- updateEstablishedModelMemoryOnJob (bucket .getTimestamp (), latestModelSizeStats );
218+ if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc .getTime ()
219+ > latestModelSizeStats .getTimestamp ().getTime () + minEstablishedTimespanMs ) {
220+ scheduleEstablishedModelMemoryUpdate (ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY );
206221 haveNewLatestModelSizeStats = false ;
207222 }
208223 }
@@ -293,15 +308,14 @@ private void processModelSizeStats(Context context, ModelSizeStats modelSizeStat
293308 persister .persistModelSizeStats (modelSizeStats );
294309 notifyModelMemoryStatusChange (context , modelSizeStats );
295310 latestModelSizeStats = modelSizeStats ;
311+ latestDateForEstablishedModelMemoryCalc = modelSizeStats .getTimestamp ();
296312 haveNewLatestModelSizeStats = true ;
297313
298314 // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets
299315 // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and
300316 // we'll NEVER consider memory usage to be established during this period
301317 if (restoredSnapshot || bucketCount >= JobProvider .BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE ) {
302- // We need to make all results written up to and including these stats available for the established memory calculation
303- persister .commitResultWrites (context .jobId );
304- updateEstablishedModelMemoryOnJob (modelSizeStats .getTimestamp (), modelSizeStats );
318+ scheduleEstablishedModelMemoryUpdate (ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY );
305319 }
306320 }
307321
@@ -351,26 +365,91 @@ public void onFailure(Exception e) {
351365 });
352366 }
353367
354- private void updateEstablishedModelMemoryOnJob (Date latestBucketTimestamp , ModelSizeStats modelSizeStats ) {
355- jobProvider .getEstablishedMemoryUsage (jobId , latestBucketTimestamp , modelSizeStats , establishedModelMemory -> {
356- JobUpdate update = new JobUpdate .Builder (jobId )
357- .setEstablishedModelMemory (establishedModelMemory ).build ();
358- UpdateJobAction .Request updateRequest = UpdateJobAction .Request .internal (jobId , update );
359- updateRequest .setWaitForAck (false );
360-
361- executeAsyncWithOrigin (client , ML_ORIGIN , UpdateJobAction .INSTANCE , updateRequest , new ActionListener <PutJobAction .Response >() {
362- @ Override
363- public void onResponse (PutJobAction .Response response ) {
364- latestEstablishedModelMemory = establishedModelMemory ;
365- LOGGER .debug ("[{}] Updated job with established model memory [{}]" , jobId , establishedModelMemory );
366- }
368+ /**
369+ * The purpose of this method is to avoid saturating the cluster state update thread
370+ * when a lookback job is churning through buckets very fast and the memory usage of
371+ * the job is changing regularly. The idea is to only update the established model
372+ * memory associated with the job a few seconds after the new value has been received.
373+ * If more updates are received during the delay period then they simply replace the
374+ * value that originally caused the update to be scheduled. This rate limits cluster
375+ * state updates due to established model memory changing to one per job per delay period.
376+ * (In reality updates will only occur this rapidly during lookback. During real-time
377+ * operation the limit of one model size stats document per bucket will mean there is a
378+ * maximum of one cluster state update per job per bucket, and usually the bucket span
379+ * is 5 minutes or more.)
380+ * @param delay The delay before updating established model memory.
381+ */
382+ synchronized void scheduleEstablishedModelMemoryUpdate (TimeValue delay ) {
367383
368- @ Override
369- public void onFailure (Exception e ) {
370- LOGGER .error ("[" + jobId + "] Failed to update job with new established model memory [" + establishedModelMemory + "]" ,
371- e );
384+ if (scheduledEstablishedModelMemoryUpdate == null ) {
385+ try {
386+ scheduledEstablishedModelMemoryUpdate = client .threadPool ().schedule (delay , MachineLearning .UTILITY_THREAD_POOL_NAME ,
387+ () -> runEstablishedModelMemoryUpdate (false ));
388+ LOGGER .trace ("[{}] Scheduled established model memory update to run in [{}]" , jobId , delay );
389+ } catch (EsRejectedExecutionException e ) {
390+ if (e .isExecutorShutdown ()) {
391+ LOGGER .debug ("failed to schedule established model memory update; shutting down" , e );
392+ } else {
393+ throw e ;
372394 }
373- });
395+ }
396+ }
397+ }
398+
399+ /**
400+ * This method is called from two places:
401+ * - From the {@link Future} used for delayed updates
402+ * - When shutting down this result processor
403+ * When shutting down the result processor it's only necessary to do anything
404+ * if an update has been scheduled, but we want to do the update immediately.
405+ * Despite cancelling the scheduled update in this case, it's possible that
406+ * it's already started running, in which case this method will get called
407+ * twice in quick succession. But the second call will do nothing, as
408+ * <code>scheduledEstablishedModelMemoryUpdate</code> will have been reset
409+ * to <code>null</code> by the first call.
410+ */
411+ private synchronized void runEstablishedModelMemoryUpdate (boolean cancelExisting ) {
412+
413+ if (scheduledEstablishedModelMemoryUpdate != null ) {
414+ if (cancelExisting ) {
415+ LOGGER .debug ("[{}] Bringing forward previously scheduled established model memory update" , jobId );
416+ FutureUtils .cancel (scheduledEstablishedModelMemoryUpdate );
417+ }
418+ scheduledEstablishedModelMemoryUpdate = null ;
419+ updateEstablishedModelMemoryOnJob ();
420+ }
421+ }
422+
423+ private void updateEstablishedModelMemoryOnJob () {
424+
425+ // Copy these before committing writes, so the calculation is done based on committed documents
426+ Date latestBucketTimestamp = latestDateForEstablishedModelMemoryCalc ;
427+ ModelSizeStats modelSizeStatsForCalc = latestModelSizeStats ;
428+
429+ // We need to make all results written up to and including these stats available for the established memory calculation
430+ persister .commitResultWrites (jobId );
431+
432+ jobProvider .getEstablishedMemoryUsage (jobId , latestBucketTimestamp , modelSizeStatsForCalc , establishedModelMemory -> {
433+ if (latestEstablishedModelMemory != establishedModelMemory ) {
434+ JobUpdate update = new JobUpdate .Builder (jobId ).setEstablishedModelMemory (establishedModelMemory ).build ();
435+ UpdateJobAction .Request updateRequest = UpdateJobAction .Request .internal (jobId , update );
436+ updateRequest .setWaitForAck (false );
437+
438+ executeAsyncWithOrigin (client , ML_ORIGIN , UpdateJobAction .INSTANCE , updateRequest ,
439+ new ActionListener <PutJobAction .Response >() {
440+ @ Override
441+ public void onResponse (PutJobAction .Response response ) {
442+ latestEstablishedModelMemory = establishedModelMemory ;
443+ LOGGER .debug ("[{}] Updated job with established model memory [{}]" , jobId , establishedModelMemory );
444+ }
445+
446+ @ Override
447+ public void onFailure (Exception e ) {
448+ LOGGER .error ("[" + jobId + "] Failed to update job with new established model memory [" +
449+ establishedModelMemory + "]" , e );
450+ }
451+ });
452+ }
374453 }, e -> LOGGER .error ("[" + jobId + "] Failed to calculate established model memory" , e ));
375454 }
376455
0 commit comments