4444import org .elasticsearch .xpack .core .ml .dataframe .DataFrameAnalyticsTaskState ;
4545import org .elasticsearch .xpack .core .ml .dataframe .stats .AnalysisStats ;
4646import org .elasticsearch .xpack .core .ml .dataframe .stats .Fields ;
47- import org .elasticsearch .xpack .core .ml .dataframe .stats .common .MemoryUsage ;
4847import org .elasticsearch .xpack .core .ml .dataframe .stats .classification .ClassificationStats ;
4948import org .elasticsearch .xpack .core .ml .dataframe .stats .common .DataCounts ;
49+ import org .elasticsearch .xpack .core .ml .dataframe .stats .common .MemoryUsage ;
5050import org .elasticsearch .xpack .core .ml .dataframe .stats .outlierdetection .OutlierDetectionStats ;
5151import org .elasticsearch .xpack .core .ml .dataframe .stats .regression .RegressionStats ;
5252import org .elasticsearch .xpack .core .ml .job .persistence .AnomalyDetectorsIndex ;
5555import org .elasticsearch .xpack .ml .dataframe .DataFrameAnalyticsTask ;
5656import org .elasticsearch .xpack .ml .dataframe .StoredProgress ;
5757import org .elasticsearch .xpack .ml .dataframe .stats .ProgressTracker ;
58- import org .elasticsearch .xpack .ml .dataframe .stats .StatsHolder ;
5958import org .elasticsearch .xpack .ml .utils .persistence .MlParserUtils ;
6059
6160import java .util .ArrayList ;
@@ -105,25 +104,20 @@ protected void taskOperation(GetDataFrameAnalyticsStatsAction.Request request, D
105104 ActionListener <QueryPage <Stats >> listener ) {
106105 logger .debug ("Get stats for running task [{}]" , task .getParams ().getId ());
107106
108- ActionListener <StatsHolder > statsHolderListener = ActionListener .wrap (
109- statsHolder -> {
107+ ActionListener <Void > reindexingProgressListener = ActionListener .wrap (
108+ aVoid -> {
110109 Stats stats = buildStats (
111110 task .getParams ().getId (),
112- statsHolder .getProgressTracker ().report (),
113- statsHolder .getDataCountsTracker ().report (task .getParams ().getId ()),
114- statsHolder .getMemoryUsage (),
115- statsHolder .getAnalysisStats ()
111+ task . getStatsHolder () .getProgressTracker ().report (),
112+ task . getStatsHolder () .getDataCountsTracker ().report (task .getParams ().getId ()),
113+ task . getStatsHolder () .getMemoryUsage (),
114+ task . getStatsHolder () .getAnalysisStats ()
116115 );
117116 listener .onResponse (new QueryPage <>(Collections .singletonList (stats ), 1 ,
118117 GetDataFrameAnalyticsAction .Response .RESULTS_FIELD ));
119118 }, listener ::onFailure
120119 );
121120
122- ActionListener <Void > reindexingProgressListener = ActionListener .wrap (
123- aVoid -> statsHolderListener .onResponse (task .getStatsHolder ()),
124- listener ::onFailure
125- );
126-
127121 task .updateReindexTaskProgress (reindexingProgressListener );
128122 }
129123
@@ -138,7 +132,7 @@ protected void doExecute(Task task, GetDataFrameAnalyticsStatsAction.Request req
138132 .collect (Collectors .toList ());
139133 request .setExpandedIds (expandedIds );
140134 ActionListener <GetDataFrameAnalyticsStatsAction .Response > runningTasksStatsListener = ActionListener .wrap (
141- runningTasksStatsResponse -> gatherStatsForStoppedTasks (request . getExpandedIds (), runningTasksStatsResponse ,
135+ runningTasksStatsResponse -> gatherStatsForStoppedTasks (getResponse . getResources (). results (), runningTasksStatsResponse ,
142136 ActionListener .wrap (
143137 finalResponse -> {
144138
@@ -163,20 +157,20 @@ protected void doExecute(Task task, GetDataFrameAnalyticsStatsAction.Request req
163157 executeAsyncWithOrigin (client , ML_ORIGIN , GetDataFrameAnalyticsAction .INSTANCE , getRequest , getResponseListener );
164158 }
165159
166- void gatherStatsForStoppedTasks (List <String > expandedIds , GetDataFrameAnalyticsStatsAction .Response runningTasksResponse ,
160+ void gatherStatsForStoppedTasks (List <DataFrameAnalyticsConfig > configs , GetDataFrameAnalyticsStatsAction .Response runningTasksResponse ,
167161 ActionListener <GetDataFrameAnalyticsStatsAction .Response > listener ) {
168- List <String > stoppedTasksIds = determineStoppedTasksIds ( expandedIds , runningTasksResponse .getResponse ().results ());
169- if (stoppedTasksIds .isEmpty ()) {
162+ List <DataFrameAnalyticsConfig > stoppedConfigs = determineStoppedConfigs ( configs , runningTasksResponse .getResponse ().results ());
163+ if (stoppedConfigs .isEmpty ()) {
170164 listener .onResponse (runningTasksResponse );
171165 return ;
172166 }
173167
174- AtomicInteger counter = new AtomicInteger (stoppedTasksIds .size ());
175- AtomicArray <Stats > jobStats = new AtomicArray <>(stoppedTasksIds .size ());
176- for (int i = 0 ; i < stoppedTasksIds .size (); i ++) {
168+ AtomicInteger counter = new AtomicInteger (stoppedConfigs .size ());
169+ AtomicArray <Stats > jobStats = new AtomicArray <>(stoppedConfigs .size ());
170+ for (int i = 0 ; i < stoppedConfigs .size (); i ++) {
177171 final int slot = i ;
178- String jobId = stoppedTasksIds .get (i );
179- searchStats (jobId , ActionListener .wrap (
172+ DataFrameAnalyticsConfig config = stoppedConfigs .get (i );
173+ searchStats (config , ActionListener .wrap (
180174 stats -> {
181175 jobStats .set (slot , stats );
182176 if (counter .decrementAndGet () == 0 ) {
@@ -192,21 +186,24 @@ void gatherStatsForStoppedTasks(List<String> expandedIds, GetDataFrameAnalyticsS
192186 }
193187 }
194188
195- static List <String > determineStoppedTasksIds (List <String > expandedIds , List <Stats > runningTasksStats ) {
189+ static List <DataFrameAnalyticsConfig > determineStoppedConfigs (List <DataFrameAnalyticsConfig > configs , List <Stats > runningTasksStats ) {
196190 Set <String > startedTasksIds = runningTasksStats .stream ().map (Stats ::getId ).collect (Collectors .toSet ());
197- return expandedIds .stream ().filter (id -> startedTasksIds .contains (id ) == false ).collect (Collectors .toList ());
191+ return configs .stream ().filter (config -> startedTasksIds .contains (config . getId () ) == false ).collect (Collectors .toList ());
198192 }
199193
200- private void searchStats (String configId , ActionListener <Stats > listener ) {
201- RetrievedStatsHolder retrievedStatsHolder = new RetrievedStatsHolder ();
194+ private void searchStats (DataFrameAnalyticsConfig config , ActionListener <Stats > listener ) {
195+ logger .debug ("[{}] Gathering stats for stopped task" , config .getId ());
196+
197+ RetrievedStatsHolder retrievedStatsHolder = new RetrievedStatsHolder (
198+ ProgressTracker .fromZeroes (config .getAnalysis ().getProgressPhases ()).report ());
202199
203200 MultiSearchRequest multiSearchRequest = new MultiSearchRequest ();
204- multiSearchRequest .add (buildStoredProgressSearch (configId ));
205- multiSearchRequest .add (buildStatsDocSearch (configId , DataCounts .TYPE_VALUE ));
206- multiSearchRequest .add (buildStatsDocSearch (configId , MemoryUsage .TYPE_VALUE ));
207- multiSearchRequest .add (buildStatsDocSearch (configId , OutlierDetectionStats .TYPE_VALUE ));
208- multiSearchRequest .add (buildStatsDocSearch (configId , ClassificationStats .TYPE_VALUE ));
209- multiSearchRequest .add (buildStatsDocSearch (configId , RegressionStats .TYPE_VALUE ));
201+ multiSearchRequest .add (buildStoredProgressSearch (config . getId () ));
202+ multiSearchRequest .add (buildStatsDocSearch (config . getId () , DataCounts .TYPE_VALUE ));
203+ multiSearchRequest .add (buildStatsDocSearch (config . getId () , MemoryUsage .TYPE_VALUE ));
204+ multiSearchRequest .add (buildStatsDocSearch (config . getId () , OutlierDetectionStats .TYPE_VALUE ));
205+ multiSearchRequest .add (buildStatsDocSearch (config . getId () , ClassificationStats .TYPE_VALUE ));
206+ multiSearchRequest .add (buildStatsDocSearch (config . getId () , RegressionStats .TYPE_VALUE ));
210207
211208 executeAsyncWithOrigin (client , ML_ORIGIN , MultiSearchAction .INSTANCE , multiSearchRequest , ActionListener .wrap (
212209 multiSearchResponse -> {
@@ -218,7 +215,7 @@ private void searchStats(String configId, ActionListener<Stats> listener) {
218215 logger .error (
219216 new ParameterizedMessage (
220217 "[{}] Item failure encountered during multi search for request [indices={}, source={}]: {}" ,
221- configId , itemRequest .indices (), itemRequest .source (), itemResponse .getFailureMessage ()),
218+ config . getId () , itemRequest .indices (), itemRequest .source (), itemResponse .getFailureMessage ()),
222219 itemResponse .getFailure ());
223220 listener .onFailure (ExceptionsHelper .serverError (itemResponse .getFailureMessage (), itemResponse .getFailure ()));
224221 return ;
@@ -227,13 +224,13 @@ private void searchStats(String configId, ActionListener<Stats> listener) {
227224 if (hits .length == 0 ) {
228225 // Not found
229226 } else if (hits .length == 1 ) {
230- parseHit (hits [0 ], configId , retrievedStatsHolder );
227+ parseHit (hits [0 ], config . getId () , retrievedStatsHolder );
231228 } else {
232229 throw ExceptionsHelper .serverError ("Found [" + hits .length + "] hits when just one was requested" );
233230 }
234231 }
235232 }
236- listener .onResponse (buildStats (configId ,
233+ listener .onResponse (buildStats (config . getId () ,
237234 retrievedStatsHolder .progress .get (),
238235 retrievedStatsHolder .dataCounts ,
239236 retrievedStatsHolder .memoryUsage ,
@@ -320,9 +317,13 @@ private GetDataFrameAnalyticsStatsAction.Response.Stats buildStats(String concre
320317
321318 private static class RetrievedStatsHolder {
322319
323- private volatile StoredProgress progress = new StoredProgress ( new ProgressTracker (). report ()) ;
320+ private volatile StoredProgress progress ;
324321 private volatile DataCounts dataCounts ;
325322 private volatile MemoryUsage memoryUsage ;
326323 private volatile AnalysisStats analysisStats ;
324+
325+ private RetrievedStatsHolder (List <PhaseProgress > defaultProgress ) {
326+ progress = new StoredProgress (defaultProgress );
327+ }
327328 }
328329}
0 commit comments