1717import org .opensearch .cluster .node .DiscoveryNode ;
1818import org .opensearch .cluster .service .ClusterService ;
1919import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
20+ import org .opensearch .common .unit .TimeValue ;
2021import org .opensearch .common .util .concurrent .AbstractAsyncTask ;
22+ import org .opensearch .common .util .concurrent .OpenSearchExecutors ;
2123import org .opensearch .index .IndexService ;
2224import org .opensearch .index .IndexSettings ;
2325import org .opensearch .index .engine .SegmentsStats ;
2426import org .opensearch .index .shard .IndexShard ;
27+ import org .opensearch .index .shard .IndexShardState ;
2528import org .opensearch .index .translog .TranslogStats ;
2629import org .opensearch .indices .IndicesService ;
2730import org .opensearch .monitor .jvm .JvmService ;
3033import org .opensearch .threadpool .ThreadPoolStats ;
3134
3235import java .io .IOException ;
33- import java .util .ArrayList ;
3436import java .util .Comparator ;
3537import java .util .List ;
3638import java .util .concurrent .CompletableFuture ;
37- import java .util .concurrent .atomic .AtomicBoolean ;
3839import java .util .stream .Collectors ;
40+ import java .util .stream .StreamSupport ;
3941
4042import static org .opensearch .gateway .remote .RemoteClusterStateService .REMOTE_CLUSTER_STATE_ENABLED_SETTING ;
4143
@@ -55,11 +57,10 @@ public class AutoForceMergeManager extends AbstractLifecycleComponent {
5557 private final IndicesService indicesService ;
5658 private final ClusterService clusterService ;
5759 private final AsyncForceMergeTask task ;
58- private final ConfigurationValidator configurationValidator ;
59- private final NodeValidator nodeValidator ;
60- private final ShardValidator shardValidator ;
60+ private ConfigurationValidator configurationValidator ;
61+ private NodeValidator nodeValidator ;
62+ private ShardValidator shardValidator ;
6163 private final ForceMergeManagerSettings forceMergeManagerSettings ;
62- private final AtomicBoolean initialCheckDone = new AtomicBoolean (false );
6364
6465 private static final Logger logger = LogManager .getLogger (AutoForceMergeManager .class );
6566
@@ -70,32 +71,36 @@ public AutoForceMergeManager(ThreadPool threadPool, OsService osService, JvmServ
7071 this .indicesService = indicesService ;
7172 this .jvmService = jvmService ;
7273 this .clusterService = clusterService ;
73- this .forceMergeManagerSettings = new ForceMergeManagerSettings (clusterService . getSettings (), clusterService . getClusterSettings (), this );
74+ this .forceMergeManagerSettings = new ForceMergeManagerSettings (clusterService , this :: modifySchedulerInterval );
7475 this .task = new AsyncForceMergeTask ();
75- this .configurationValidator = new ConfigurationValidator ();
76- this .nodeValidator = new NodeValidator ();
77- this .shardValidator = new ShardValidator ();
7876 }
7977
8078 @ Override
8179 protected void doStart () {
80+ this .configurationValidator = new ConfigurationValidator ();
81+ this .nodeValidator = new NodeValidator ();
82+ this .shardValidator = new ShardValidator ();
8283 }
8384
8485 @ Override
8586 protected void doStop () {
86- this .task .close ();
87+ if (task != null ) {
88+ this .task .close ();
89+ }
8790 }
8891
8992 @ Override
9093 protected void doClose () {
91- this .task .close ();
94+ if (task != null ) {
95+ this .task .close ();
96+ }
97+ }
98+
99+ private void modifySchedulerInterval (TimeValue schedulerInterval ) {
100+ this .task .setInterval (schedulerInterval );
92101 }
93102
94103 private void triggerForceMerge () {
95- if (forceMergeManagerSettings .isAutoForceMergeFeatureEnabled () == false ) {
96- logger .debug ("Cluster configuration shows auto force merge feature is disabled. Closing task." );
97- return ;
98- }
99104 if (configurationValidator .hasWarmNodes () == false ) {
100105 logger .debug ("No warm nodes found. Skipping Auto Force merge." );
101106 return ;
@@ -104,19 +109,8 @@ private void triggerForceMerge() {
104109 logger .debug ("Node capacity constraints are not allowing to trigger auto ForceMerge" );
105110 return ;
106111 }
107- List <IndexShard > shards = new ArrayList <>();
108- for (IndexService indexService : indicesService ) {
109- for (IndexShard shard : indexService ) {
110- if (shard .routingEntry ().primary ()) {
111- if (shardValidator .validate (shard ).isAllowed ()) {
112- shards .add (shard );
113- }
114- }
115- }
116- }
117- List <IndexShard > sortedShards = getSortedShardsByTranslogAge (shards );
118112 int iteration = nodeValidator .getMaxConcurrentForceMerges ();
119- for (IndexShard shard : sortedShards ) {
113+ for (IndexShard shard : getShardsBasedOnSorting ( indicesService ) ) {
120114 if (iteration == 0 || nodeValidator .validate ().isAllowed () == false ) {
121115 logger .debug ("Node conditions no longer suitable for force merge" );
122116 break ;
@@ -125,7 +119,7 @@ private void triggerForceMerge() {
125119 CompletableFuture .runAsync (() -> {
126120 try {
127121 shard .forceMerge (new ForceMergeRequest ()
128- .maxNumSegments (forceMergeManagerSettings .getSegmentCountThreshold ()));
122+ .maxNumSegments (forceMergeManagerSettings .getSegmentCount ()));
129123 logger .debug ("Merging is completed successfully for the shard {}" , shard .shardId ());
130124 } catch (IOException e ) {
131125 logger .error ("Error during force merge for shard {}" , shard .shardId (), e );
@@ -142,9 +136,11 @@ private void triggerForceMerge() {
142136 }
143137 }
144138
145- private List <IndexShard > getSortedShardsByTranslogAge (List <IndexShard > shards ) {
146-
147- return shards .stream ()
139+ private List <IndexShard > getShardsBasedOnSorting (Iterable <IndexService > indicesService ) {
140+ return StreamSupport .stream (indicesService .spliterator (), false )
141+ .flatMap (indexService -> StreamSupport .stream (indexService .spliterator (), false ))
142+ .filter (shard -> shard .routingEntry ().primary ())
143+ .filter (shard -> shardValidator .validate (shard ).isAllowed ())
148144 .sorted (new ShardAgeComparator ())
149145 .collect (Collectors .toList ());
150146 }
@@ -173,10 +169,16 @@ private long getEarliestLastModifiedAge(IndexShard shard) {
173169 */
174170 protected class ConfigurationValidator implements ValidationStrategy {
175171
176- private boolean isOnlyDataNode = false ;
172+ private final boolean isOnlyDataNode ;
177173 private boolean isRemoteStoreEnabled = false ;
178174 private boolean hasWarmNodes = false ;
179175
176+ ConfigurationValidator () {
177+ DiscoveryNode localNode = clusterService .localNode ();
178+ isOnlyDataNode = localNode .isDataNode () && !localNode .isWarmNode ();
179+ isRemoteStoreEnabled = isRemoteStorageEnabled ();
180+ }
181+
180182 /**
181183 * Validates the node configuration against required criteria.
182184 * This method first ensures initialization is complete, then checks if the node
@@ -191,9 +193,9 @@ public ValidationResult validate() {
191193 logger .debug ("Cluster configuration shows auto force merge feature is disabled. Closing task." );
192194 return new ValidationResult (false );
193195 }
194- initializeIfNeeded ();
195196 if (isRemoteStoreEnabled == false ) {
196197 logger .debug ("Cluster configuration is not meeting the criteria. Closing task." );
198+ task .close ();
197199 return new ValidationResult (false );
198200 }
199201 if (isOnlyDataNode == false ) {
@@ -204,23 +206,6 @@ public ValidationResult validate() {
204206 return new ValidationResult (true );
205207 }
206208
207- /**
208- * Initializes the configuration check results if not already done.
209- * This method performs a one-time check of:
210- * - Node type (must be data node but not warm node)
211- * - Remote store configuration
212- * The results are cached to avoid repeated checks.
213- * Thread-safe through atomic operation on initialCheckDone.
214- */
215- private void initializeIfNeeded () {
216- if (initialCheckDone .get () == false ) {
217- DiscoveryNode localNode = clusterService .localNode ();
218- isOnlyDataNode = localNode .isDataNode () && !localNode .isWarmNode ();
219- isRemoteStoreEnabled = isRemoteStorageEnabled ();
220- initialCheckDone .set (true );
221- }
222- }
223-
224209 /**
225210 * Checks if remote storage is enabled in the cluster settings.
226211 */
@@ -249,18 +234,16 @@ private boolean hasWarmNodes() {
249234 */
250235 protected class NodeValidator implements ValidationStrategy {
251236
252- private int maxConcurrentForceMerges ;
253-
254237 @ Override
255238 public ValidationResult validate () {
256239 double cpuPercent = osService .stats ().getCpu ().getPercent ();
257240 if (cpuPercent >= forceMergeManagerSettings .getCpuThreshold ()) {
258- logger .debug ("CPU usage too high : {}% " , cpuPercent );
241+ logger .debug ("CPU usage: {} breached the threshold : {}" , cpuPercent , forceMergeManagerSettings . getCpuThreshold () );
259242 return new ValidationResult (false );
260243 }
261244 double jvmUsedPercent = jvmService .stats ().getMem ().getHeapUsedPercent ();
262245 if (jvmUsedPercent >= forceMergeManagerSettings .getJvmThreshold ()) {
263- logger .debug ("JVM memory usage too high : {}% " , jvmUsedPercent );
246+ logger .debug ("JVM memory: {}% breached the threshold : {}" , jvmUsedPercent , forceMergeManagerSettings . getJvmThreshold () );
264247 return new ValidationResult (false );
265248 }
266249 if (areForceMergeThreadsAvailable () == false ) {
@@ -273,17 +256,14 @@ public ValidationResult validate() {
273256 private boolean areForceMergeThreadsAvailable () {
274257 for (ThreadPoolStats .Stats stats : threadPool .stats ()) {
275258 if (stats .getName ().equals (ThreadPool .Names .FORCE_MERGE )) {
276- this .maxConcurrentForceMerges = Math .max (1 , stats .getThreads ()) * forceMergeManagerSettings .getConcurrencyMultiplier ();
277- if ((stats .getQueue () == 0 ))
278- // If force merge thread count is set by the customer( greater than 0) and active thread count is already equal or more than this threshold block skip any more force merges
279- return forceMergeManagerSettings .getForceMergeThreadCount () <= 0 || stats .getActive () < forceMergeManagerSettings .getForceMergeThreadCount ();
259+ return stats .getQueue () == 0 ;
280260 }
281261 }
282262 return false ;
283263 }
284264
285265 public Integer getMaxConcurrentForceMerges () {
286- return this . maxConcurrentForceMerges ;
266+ return Math . max ( 1 , ( OpenSearchExecutors . allocatedProcessors ( clusterService . getSettings ()) / 8 )) * forceMergeManagerSettings . getConcurrencyMultiplier () ;
287267 }
288268 }
289269
@@ -302,31 +282,39 @@ public ValidationResult validate(IndexShard shard) {
302282 logger .debug ("No shard found." );
303283 return new ValidationResult (false );
304284 }
285+ if (shard .state () != IndexShardState .STARTED ) {
286+ logger .debug ("Shard({}) skipped: Shard is not in started state." , shard .shardId ());
287+ return new ValidationResult (false );
288+ }
305289 if (isIndexWarmCandidate (shard ) == false ) {
306- logger .debug ("Shard {} doesn't belong to a warm candidate index" , shard .shardId ());
290+ logger .debug ("Shard({}) skipped: Shard doesn't belong to a warm candidate index" , shard .shardId ());
307291 return new ValidationResult (false );
308292 }
309293 CommonStats stats = new CommonStats (indicesService .getIndicesQueryCache (), shard , flags );
310294 SegmentsStats segmentsStats = stats .getSegments ();
311295 TranslogStats translogStats = stats .getTranslog ();
312- if (segmentsStats != null && segmentsStats .getCount () <= forceMergeManagerSettings .getSegmentCountThreshold ()) {
313- logger .debug ("Shard {} doesn't have enough segments to merge." , shard .shardId ());
296+ if (segmentsStats != null && segmentsStats .getCount () <= forceMergeManagerSettings .getSegmentCount ()) {
297+ logger .debug (
298+ "Shard({}) skipped: Shard has {} segments, not exceeding threshold of {}" ,
299+ shard .shardId (),
300+ segmentsStats .getCount (),
301+ forceMergeManagerSettings .getSegmentCount ()
302+ );
314303 return new ValidationResult (false );
315304 }
316305 if (translogStats != null && translogStats .getEarliestLastModifiedAge () < forceMergeManagerSettings .getSchedulerInterval ().getMillis ()) {
317- logger .debug ("Shard {} translog is too recent." , shard .shardId ());
306+ logger .debug ("Shard({}) skipped: Translog is too recent. Age({}ms)" ,
307+ shard .shardId (),
308+ translogStats .getEarliestLastModifiedAge ()
309+ );
318310 return new ValidationResult (false );
319311 }
320312 return new ValidationResult (true );
321313 }
322314
323315 private boolean isIndexWarmCandidate (IndexShard shard ) {
324316 IndexSettings indexSettings = shard .indexSettings ();
325- return indexSettings .getScopedSettings ().get (IndexSettings .INDEX_IS_WARM_CANDIDATE_INDEX );
326- }
327-
328- private boolean isRelocating (IndexShard shard ){
329- return false ;
317+ return indexSettings .getScopedSettings ().get (IndexSettings .INDEX_ALLOW_AUTO_FORCE_MERGES );
330318 }
331319 }
332320
@@ -393,7 +381,7 @@ protected boolean mustReschedule() {
393381 */
394382 @ Override
395383 protected void runInternal () {
396- if (initialCheckDone . get () == false && configurationValidator .validate ().isAllowed () == false ) {
384+ if (configurationValidator .validate ().isAllowed () == false ) {
397385 return ;
398386 }
399387 triggerForceMerge ();
0 commit comments