|
209 | 209 | import static org.opensearch.core.common.util.CollectionUtils.arrayAsArrayList; |
210 | 210 | import static org.opensearch.index.IndexService.IndexCreationContext.CREATE_INDEX; |
211 | 211 | import static org.opensearch.index.IndexService.IndexCreationContext.METADATA_VERIFICATION; |
| 212 | +import static org.opensearch.index.TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE; |
| 213 | +import static org.opensearch.index.TieredMergePolicyProvider.MIN_DEFAULT_MAX_MERGE_AT_ONCE; |
212 | 214 | import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; |
213 | 215 | import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING; |
214 | 216 | import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent; |
@@ -275,6 +277,21 @@ public class IndicesService extends AbstractLifecycleComponent |
275 | 277 | Property.Dynamic |
276 | 278 | ); |
277 | 279 |
|
| 280 | + /** |
| 281 | + * This setting is used to set the maxMergeAtOnce parameter for {@code TieredMergePolicy} |
| 282 | + * when the {@code index.merge.policy.max_merge_at_once} index setting is not provided during index creation |
| 283 | + * or when the existing {@code index.merge.policy.max_merge_at_once} index setting is set as null. |
| 284 | + * This comes handy when the user wants to change the maxMergeAtOnce across all indexes created in a cluster |
| 285 | + * which is different from the default. |
| 286 | + */ |
| 287 | + public static final Setting<Integer> CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING = Setting.intSetting( |
| 288 | + "cluster.default.index.max_merge_at_once", |
| 289 | + DEFAULT_MAX_MERGE_AT_ONCE, |
| 290 | + MIN_DEFAULT_MAX_MERGE_AT_ONCE, |
| 291 | + Property.NodeScope, |
| 292 | + Property.Dynamic |
| 293 | + ); |
| 294 | + |
278 | 295 | /** |
279 | 296 | * This setting is used to set the minimum refresh interval applicable for all indexes in a cluster. The |
280 | 297 | * {@code cluster.default.index.refresh_interval} setting value needs to be higher than this setting's value. Index |
@@ -369,6 +386,7 @@ public class IndicesService extends AbstractLifecycleComponent |
369 | 386 | private final Consumer<IndexShard> replicator; |
370 | 387 | private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider; |
371 | 388 | private volatile int maxSizeInRequestCache; |
| 389 | + private volatile int defaultMaxMergeAtOnce; |
372 | 390 |
|
373 | 391 | @Override |
374 | 392 | protected void doStart() { |
@@ -523,6 +541,10 @@ protected void closeInternal() { |
523 | 541 | this.maxSizeInRequestCache = INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING.get(clusterService.getSettings()); |
524 | 542 | clusterService.getClusterSettings() |
525 | 543 | .addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING, this::setMaxSizeInRequestCache); |
| 544 | + |
| 545 | + this.defaultMaxMergeAtOnce = CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.get(clusterService.getSettings()); |
| 546 | + clusterService.getClusterSettings() |
| 547 | + .addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING, this::onDefaultMaxMergeAtOnceUpdate); |
526 | 548 | } |
527 | 549 |
|
528 | 550 | public IndicesService( |
@@ -607,6 +629,22 @@ private void onRefreshIntervalUpdate(TimeValue clusterDefaultRefreshInterval) { |
607 | 629 | } |
608 | 630 | } |
609 | 631 |
|
| 632 | + /** |
| 633 | + * The changes to dynamic cluster setting {@code cluster.default.index.max_merge_at_once} needs to be updated. This |
| 634 | + * method gets called whenever the setting changes. We set the instance variable with the updated value as this is |
| 635 | + * also a supplier to all IndexService that have been created on the node. We also notify the change to all |
| 636 | + * IndexService instances that are created on this node. |
| 637 | + * |
| 638 | + * @param newDefaultMaxMergeAtOnce the updated cluster default maxMergeAtOnce. |
| 639 | + */ |
| 640 | + private void onDefaultMaxMergeAtOnceUpdate(int newDefaultMaxMergeAtOnce) { |
| 641 | + this.defaultMaxMergeAtOnce = newDefaultMaxMergeAtOnce; // do we need this? |
| 642 | + for (Map.Entry<String, IndexService> entry : indices.entrySet()) { |
| 643 | + IndexService indexService = entry.getValue(); |
| 644 | + indexService.onDefaultMaxMergeAtOnceChanged(newDefaultMaxMergeAtOnce); |
| 645 | + } |
| 646 | + } |
| 647 | + |
610 | 648 | private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTranslogFactorySupplier( |
611 | 649 | Supplier<RepositoriesService> repositoriesServiceSupplier, |
612 | 650 | ThreadPool threadPool, |
@@ -1009,7 +1047,8 @@ private synchronized IndexService createIndexService( |
1009 | 1047 | this.recoverySettings, |
1010 | 1048 | this.remoteStoreSettings, |
1011 | 1049 | replicator, |
1012 | | - segmentReplicationStatsProvider |
| 1050 | + segmentReplicationStatsProvider, |
| 1051 | + this::getClusterDefaultMaxMergeAtOnce |
1013 | 1052 | ); |
1014 | 1053 | } |
1015 | 1054 |
|
@@ -2155,6 +2194,10 @@ private TimeValue getClusterDefaultRefreshInterval() { |
2155 | 2194 | return this.clusterDefaultRefreshInterval; |
2156 | 2195 | } |
2157 | 2196 |
|
| 2197 | + private Integer getClusterDefaultMaxMergeAtOnce() { |
| 2198 | + return this.defaultMaxMergeAtOnce; |
| 2199 | + } |
| 2200 | + |
2158 | 2201 | public RemoteStoreSettings getRemoteStoreSettings() { |
2159 | 2202 | return this.remoteStoreSettings; |
2160 | 2203 | } |
|
0 commit comments