@@ -85,13 +85,14 @@ public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer<Excepti
8585
8686 ActionListener <DatafeedJob > datafeedJobHandler = ActionListener .wrap (
8787 datafeedJob -> {
88+ String jobId = datafeedJob .getJobId ();
8889 Holder holder = new Holder (task , datafeedId , datafeedJob ,
89- new ProblemTracker (auditor , datafeedJob . getJobId () ), finishHandler );
90+ new ProblemTracker (auditor , jobId ), finishHandler );
9091 runningDatafeedsOnThisNode .put (task .getAllocationId (), holder );
9192 task .updatePersistentTaskState (DatafeedState .STARTED , new ActionListener <PersistentTask <?>>() {
9293 @ Override
9394 public void onResponse (PersistentTask <?> persistentTask ) {
94- taskRunner .runWhenJobIsOpened (task );
95+ taskRunner .runWhenJobIsOpened (task , jobId );
9596 }
9697
9798 @ Override
@@ -267,17 +268,23 @@ protected void doRun() {
267268 }
268269 }
269270
270- private String getJobId (TransportStartDatafeedAction .DatafeedTask task ) {
271- return runningDatafeedsOnThisNode .get (task .getAllocationId ()).getJobId ();
271+ /**
272+ * Returns <code>null</code> if the datafeed is not running on this node.
273+ */
274+ private String getJobIdIfDatafeedRunningOnThisNode (TransportStartDatafeedAction .DatafeedTask task ) {
275+ Holder holder = runningDatafeedsOnThisNode .get (task .getAllocationId ());
276+ if (holder == null ) {
277+ return null ;
278+ }
279+ return holder .getJobId ();
272280 }
273281
274- private JobState getJobState (PersistentTasksCustomMetaData tasks , TransportStartDatafeedAction . DatafeedTask datafeedTask ) {
275- return MlTasks .getJobStateModifiedForReassignments (getJobId ( datafeedTask ) , tasks );
282+ private JobState getJobState (PersistentTasksCustomMetaData tasks , String jobId ) {
283+ return MlTasks .getJobStateModifiedForReassignments (jobId , tasks );
276284 }
277285
278- private boolean jobHasOpenAutodetectCommunicator (PersistentTasksCustomMetaData tasks ,
279- TransportStartDatafeedAction .DatafeedTask datafeedTask ) {
280- PersistentTasksCustomMetaData .PersistentTask <?> jobTask = MlTasks .getJobTask (getJobId (datafeedTask ), tasks );
286+ private boolean jobHasOpenAutodetectCommunicator (PersistentTasksCustomMetaData tasks , String jobId ) {
287+ PersistentTasksCustomMetaData .PersistentTask <?> jobTask = MlTasks .getJobTask (jobId , tasks );
281288 if (jobTask == null ) {
282289 return false ;
283290 }
@@ -492,14 +499,14 @@ private class TaskRunner implements ClusterStateListener {
492499
493500 private final List <TransportStartDatafeedAction .DatafeedTask > tasksToRun = new CopyOnWriteArrayList <>();
494501
495- private void runWhenJobIsOpened (TransportStartDatafeedAction .DatafeedTask datafeedTask ) {
502+ private void runWhenJobIsOpened (TransportStartDatafeedAction .DatafeedTask datafeedTask , String jobId ) {
496503 ClusterState clusterState = clusterService .state ();
497504 PersistentTasksCustomMetaData tasks = clusterState .getMetaData ().custom (PersistentTasksCustomMetaData .TYPE );
498- if (getJobState (tasks , datafeedTask ) == JobState .OPENED && jobHasOpenAutodetectCommunicator (tasks , datafeedTask )) {
505+ if (getJobState (tasks , jobId ) == JobState .OPENED && jobHasOpenAutodetectCommunicator (tasks , jobId )) {
499506 runTask (datafeedTask );
500507 } else {
501508 logger .info ("Datafeed [{}] is waiting for job [{}] to be opened" ,
502- datafeedTask .getDatafeedId (), getJobId ( datafeedTask ) );
509+ datafeedTask .getDatafeedId (), jobId );
503510 tasksToRun .add (datafeedTask );
504511 }
505512 }
@@ -530,17 +537,19 @@ public void clusterChanged(ClusterChangedEvent event) {
530537
531538 List <TransportStartDatafeedAction .DatafeedTask > remainingTasks = new ArrayList <>();
532539 for (TransportStartDatafeedAction .DatafeedTask datafeedTask : tasksToRun ) {
533- if (runningDatafeedsOnThisNode .containsKey (datafeedTask .getAllocationId ()) == false ) {
540+ String jobId = getJobIdIfDatafeedRunningOnThisNode (datafeedTask );
541+ if (jobId == null ) {
542+ // Datafeed is not running on this node any more
534543 continue ;
535544 }
536- JobState jobState = getJobState (currentTasks , datafeedTask );
537- if (jobState == JobState .OPENING || jobHasOpenAutodetectCommunicator (currentTasks , datafeedTask ) == false ) {
545+ JobState jobState = getJobState (currentTasks , jobId );
546+ if (jobState == JobState .OPENING || jobHasOpenAutodetectCommunicator (currentTasks , jobId ) == false ) {
538547 remainingTasks .add (datafeedTask );
539548 } else if (jobState == JobState .OPENED ) {
540549 runTask (datafeedTask );
541550 } else {
542551 logger .warn ("Datafeed [{}] is stopping because job [{}] state is [{}]" ,
543- datafeedTask .getDatafeedId (), getJobId ( datafeedTask ) , jobState );
552+ datafeedTask .getDatafeedId (), jobId , jobState );
544553 datafeedTask .stop ("job_never_opened" , TimeValue .timeValueSeconds (20 ));
545554 }
546555 }
0 commit comments