3838import java .util .Collection ;
3939import java .util .HashMap ;
4040import java .util .HashSet ;
41+ import java .util .Iterator ;
4142import java .util .List ;
4243import java .util .Map ;
4344import java .util .Set ;
4445import java .util .concurrent .atomic .AtomicBoolean ;
4546import java .util .concurrent .atomic .AtomicReference ;
47+ import java .util .function .Function ;
4648import java .util .stream .Collectors ;
4749
4850import static org .elasticsearch .xpack .core .ClientHelper .ML_ORIGIN ;
6870 * If there was an error in step 3 and the config is in both the clusterstate and
6971 * index then when the migrator retries it must not overwrite an existing job config
7072 * document as once the index document is present all update operations will function
71- * on that rather than the clusterstate
73+ * on that rather than the clusterstate.
74+ *
75+ * The number of configs indexed in each bulk operation is limited by {@link #MAX_BULK_WRITE_SIZE}
76+ * pairs of datafeeds and jobs are migrated together.
7277 */
7378public class MlConfigMigrator {
7479
@@ -77,6 +82,8 @@ public class MlConfigMigrator {
7782 public static final String MIGRATED_FROM_VERSION = "migrated from version" ;
7883 public static final Version MIN_NODE_VERSION = Version .V_6_6_0 ;
7984
85+ static final int MAX_BULK_WRITE_SIZE = 100 ;
86+
8087 private final Client client ;
8188 private final ClusterService clusterService ;
8289
@@ -118,12 +125,15 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
118125 return ;
119126 }
120127
128+
121129 logger .debug ("migrating ml configurations" );
122130
123- Collection <DatafeedConfig > datafeedsToMigrate = stoppedDatafeedConfigs (clusterState );
124- List < Job > jobsToMigrate = nonDeletingJobs (closedJobConfigs (clusterState )).stream ()
131+ Collection <DatafeedConfig > stoppedDatafeeds = stoppedDatafeedConfigs (clusterState );
132+ Map < String , Job > eligibleJobs = nonDeletingJobs (closedJobConfigs (clusterState )).stream ()
125133 .map (MlConfigMigrator ::updateJobForMigration )
126- .collect (Collectors .toList ());
134+ .collect (Collectors .toMap (Job ::getId , Function .identity (), (a , b ) -> a ));
135+
136+ JobsAndDatafeeds jobsAndDatafeedsToMigrate = limitWrites (stoppedDatafeeds , eligibleJobs );
127137
128138 ActionListener <Boolean > unMarkMigrationInProgress = ActionListener .wrap (
129139 response -> {
@@ -136,16 +146,16 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener
136146 }
137147 );
138148
139- if (datafeedsToMigrate . isEmpty () && jobsToMigrate . isEmpty () ) {
149+ if (jobsAndDatafeedsToMigrate . totalCount () == 0 ) {
140150 unMarkMigrationInProgress .onResponse (Boolean .FALSE );
141151 return ;
142152 }
143153
144- writeConfigToIndex (datafeedsToMigrate , jobsToMigrate , ActionListener .wrap (
154+ writeConfigToIndex (jobsAndDatafeedsToMigrate . datafeedConfigs , jobsAndDatafeedsToMigrate . jobs , ActionListener .wrap (
145155 failedDocumentIds -> {
146- List <String > successfulJobWrites = filterFailedJobConfigWrites (failedDocumentIds , jobsToMigrate );
156+ List <String > successfulJobWrites = filterFailedJobConfigWrites (failedDocumentIds , jobsAndDatafeedsToMigrate . jobs );
147157 List <String > successfulDatafeedWrites =
148- filterFailedDatafeedConfigWrites (failedDocumentIds , datafeedsToMigrate );
158+ filterFailedDatafeedConfigWrites (failedDocumentIds , jobsAndDatafeedsToMigrate . datafeedConfigs );
149159 removeFromClusterState (successfulJobWrites , successfulDatafeedWrites , unMarkMigrationInProgress );
150160 },
151161 unMarkMigrationInProgress ::onFailure
@@ -350,6 +360,62 @@ public static List<DatafeedConfig> stoppedDatafeedConfigs(ClusterState clusterSt
350360 .collect (Collectors .toList ());
351361 }
352362
363+ public static class JobsAndDatafeeds {
364+ List <Job > jobs ;
365+ List <DatafeedConfig > datafeedConfigs ;
366+
367+ private JobsAndDatafeeds () {
368+ jobs = new ArrayList <>();
369+ datafeedConfigs = new ArrayList <>();
370+ }
371+
372+ public int totalCount () {
373+ return jobs .size () + datafeedConfigs .size ();
374+ }
375+ }
376+
377+ /**
378+ * Return at most {@link #MAX_BULK_WRITE_SIZE} configs favouring
379+ * datafeed and job pairs so if a datafeed is chosen so is its job.
380+ *
381+ * @param datafeedsToMigrate Datafeed configs
382+ * @param jobsToMigrate Job configs
383+ * @return Job and datafeed configs
384+ */
385+ public static JobsAndDatafeeds limitWrites (Collection <DatafeedConfig > datafeedsToMigrate , Map <String , Job > jobsToMigrate ) {
386+ JobsAndDatafeeds jobsAndDatafeeds = new JobsAndDatafeeds ();
387+
388+ if (datafeedsToMigrate .size () + jobsToMigrate .size () <= MAX_BULK_WRITE_SIZE ) {
389+ jobsAndDatafeeds .jobs .addAll (jobsToMigrate .values ());
390+ jobsAndDatafeeds .datafeedConfigs .addAll (datafeedsToMigrate );
391+ return jobsAndDatafeeds ;
392+ }
393+
394+ int count = 0 ;
395+
396+ // prioritise datafeed and job pairs
397+ for (DatafeedConfig datafeedConfig : datafeedsToMigrate ) {
398+ if (count < MAX_BULK_WRITE_SIZE ) {
399+ jobsAndDatafeeds .datafeedConfigs .add (datafeedConfig );
400+ count ++;
401+ Job datafeedsJob = jobsToMigrate .remove (datafeedConfig .getJobId ());
402+ if (datafeedsJob != null ) {
403+ jobsAndDatafeeds .jobs .add (datafeedsJob );
404+ count ++;
405+ }
406+ }
407+ }
408+
409+ // are there jobs without datafeeds to migrate
410+ Iterator <Job > iter = jobsToMigrate .values ().iterator ();
411+ while (iter .hasNext () && count < MAX_BULK_WRITE_SIZE ) {
412+ jobsAndDatafeeds .jobs .add (iter .next ());
413+ count ++;
414+ }
415+
416+ return jobsAndDatafeeds ;
417+ }
418+
353419 /**
354420 * Check for failures in the bulk response and return the
355421 * Ids of any documents not written to the index
0 commit comments