1919import org .elasticsearch .action .admin .indices .refresh .RefreshAction ;
2020import org .elasticsearch .action .admin .indices .refresh .RefreshRequest ;
2121import org .elasticsearch .action .admin .indices .refresh .RefreshResponse ;
22+ import org .elasticsearch .action .bulk .BulkItemResponse ;
2223import org .elasticsearch .action .support .ContextPreservingActionListener ;
2324import org .elasticsearch .client .Client ;
2425import org .elasticsearch .client .ParentTaskAssigningClient ;
@@ -214,6 +215,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
214215 // Reindexing is complete; start analytics
215216 ActionListener <BulkByScrollResponse > reindexCompletedListener = ActionListener .wrap (
216217 reindexResponse -> {
218+
217219 // If the reindex task is canceled, this listener is called.
218220 // Consequently, we should not signal reindex completion.
219221 if (task .isStopping ()) {
@@ -222,7 +224,18 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
222224 task .markAsCompleted ();
223225 return ;
224226 }
227+
225228 task .setReindexingTaskId (null );
229+
230+ Exception reindexError = getReindexError (task .getParams ().getId (), reindexResponse );
231+ if (reindexError != null ) {
232+ task .markAsFailed (reindexError );
233+ return ;
234+ }
235+
236+ LOGGER .debug ("[{}] Reindex completed; created [{}]; retries [{}]" , task .getParams ().getId (),
237+ reindexResponse .getCreated (), reindexResponse .getBulkRetries ());
238+
226239 auditor .info (
227240 config .getId (),
228241 Messages .getMessage (Messages .DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING , config .getDest ().getIndex (),
@@ -251,6 +264,7 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
251264 reindexRequest .setDestIndex (config .getDest ().getIndex ());
252265 reindexRequest .setScript (new Script ("ctx._source." + DestinationIndex .ID_COPY + " = ctx._id" ));
253266 reindexRequest .setParentTask (task .getParentTaskId ());
267+ reindexRequest .getSearchRequest ().allowPartialSearchResults (false );
254268
255269 final ThreadContext threadContext = parentTaskClient .threadPool ().getThreadContext ();
256270 final Supplier <ThreadContext .StoredContext > supplier = threadContext .newRestorableContext (false );
@@ -295,6 +309,26 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
295309 new GetIndexRequest ().indices (config .getDest ().getIndex ()), destIndexListener );
296310 }
297311
312+ private static Exception getReindexError (String jobId , BulkByScrollResponse reindexResponse ) {
313+ if (reindexResponse .getBulkFailures ().isEmpty () == false ) {
314+ LOGGER .error ("[{}] reindexing encountered {} failures" , jobId ,
315+ reindexResponse .getBulkFailures ().size ());
316+ for (BulkItemResponse .Failure failure : reindexResponse .getBulkFailures ()) {
317+ LOGGER .error ("[{}] reindexing failure: {}" , jobId , failure );
318+ }
319+ return ExceptionsHelper .serverError ("reindexing encountered " + reindexResponse .getBulkFailures ().size () + " failures" );
320+ }
321+ if (reindexResponse .getReasonCancelled () != null ) {
322+ LOGGER .error ("[{}] reindex task got cancelled with reason [{}]" , jobId , reindexResponse .getReasonCancelled ());
323+ return ExceptionsHelper .serverError ("reindex task got cancelled with reason [" + reindexResponse .getReasonCancelled () + "]" );
324+ }
325+ if (reindexResponse .isTimedOut ()) {
326+ LOGGER .error ("[{}] reindex task timed out after [{}]" , jobId , reindexResponse .getTook ().getStringRep ());
327+ return ExceptionsHelper .serverError ("reindex task timed out after [" + reindexResponse .getTook ().getStringRep () + "]" );
328+ }
329+ return null ;
330+ }
331+
298332 private static boolean isTaskCancelledException (Exception error ) {
299333 return ExceptionsHelper .unwrapCause (error ) instanceof TaskCancelledException
300334 || ExceptionsHelper .unwrapCause (error .getCause ()) instanceof TaskCancelledException ;
0 commit comments