Skip to content

Commit 9fe9c14

Browse files
committed
Increase default maxMergeAtOnce from 10 to 30
1 parent 137683e commit 9fe9c14

File tree

7 files changed

+84
-16
lines changed

7 files changed

+84
-16
lines changed

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ public void apply(Settings value, Settings current, Settings previous) {
297297
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
298298
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
299299
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
300+
IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING,
300301
IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING,
301302
IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING,
302303
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,

server/src/main/java/org/opensearch/index/IndexModule.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,8 @@ public IndexService newIndexService(
631631
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
632632
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
633633
RecoverySettings recoverySettings,
634-
RemoteStoreSettings remoteStoreSettings
634+
RemoteStoreSettings remoteStoreSettings,
635+
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
635636
) throws IOException {
636637
return newIndexService(
637638
indexCreationContext,
@@ -656,7 +657,8 @@ public IndexService newIndexService(
656657
recoverySettings,
657658
remoteStoreSettings,
658659
(s) -> {},
659-
shardId -> ReplicationStats.empty()
660+
shardId -> ReplicationStats.empty(),
661+
clusterDefaultMaxMergeAtOnceSupplier
660662
);
661663
}
662664

@@ -683,7 +685,8 @@ public IndexService newIndexService(
683685
RecoverySettings recoverySettings,
684686
RemoteStoreSettings remoteStoreSettings,
685687
Consumer<IndexShard> replicator,
686-
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
688+
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
689+
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
687690
) throws IOException {
688691
final IndexEventListener eventListener = freeze();
689692
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
@@ -746,7 +749,8 @@ public IndexService newIndexService(
746749
fileCache,
747750
compositeIndexSettings,
748751
replicator,
749-
segmentReplicationStatsProvider
752+
segmentReplicationStatsProvider,
753+
clusterDefaultMaxMergeAtOnceSupplier
750754
);
751755
success = true;
752756
return indexService;

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
198198
private final CompositeIndexSettings compositeIndexSettings;
199199
private final Consumer<IndexShard> replicator;
200200
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
201+
private final Supplier<Integer> clusterDefaultMaxMergeAtOnce;
201202

202203
public IndexService(
203204
IndexSettings indexSettings,
@@ -237,7 +238,8 @@ public IndexService(
237238
FileCache fileCache,
238239
CompositeIndexSettings compositeIndexSettings,
239240
Consumer<IndexShard> replicator,
240-
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider
241+
Function<ShardId, ReplicationStats> segmentReplicationStatsProvider,
242+
Supplier<Integer> clusterDefaultMaxMergeAtOnceSupplier
241243
) {
242244
super(indexSettings);
243245
this.allowExpensiveQueries = allowExpensiveQueries;
@@ -325,6 +327,9 @@ public IndexService(
325327
this.fileCache = fileCache;
326328
this.replicator = replicator;
327329
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
330+
this.clusterDefaultMaxMergeAtOnce = clusterDefaultMaxMergeAtOnceSupplier;
331+
332+
indexSettings.setMaxMergesAtOnce(this.clusterDefaultMaxMergeAtOnce.get());
328333
updateFsyncTaskIfNecessary();
329334
}
330335

@@ -362,7 +367,8 @@ public IndexService(
362367
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
363368
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
364369
RecoverySettings recoverySettings,
365-
RemoteStoreSettings remoteStoreSettings
370+
RemoteStoreSettings remoteStoreSettings,
371+
Supplier<Integer> clusterDefaultMaxMergeAtOnce
366372
) {
367373
this(
368374
indexSettings,
@@ -402,7 +408,8 @@ public IndexService(
402408
null,
403409
null,
404410
s -> {},
405-
(shardId) -> ReplicationStats.empty()
411+
(shardId) -> ReplicationStats.empty(),
412+
clusterDefaultMaxMergeAtOnce
406413
);
407414
}
408415

@@ -1122,11 +1129,12 @@ private void updateReplicationTask() {
11221129
}
11231130

11241131
/**
1125-
* Called whenever the refresh interval changes. This can happen in 2 cases -
1126-
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
1127-
* indexes relying on cluster default.
1128-
* 2. {@code index.refresh_interval} index setting changes.
1132+
* Called whenever the cluster level {@code cluster.default.index.max_merge_at_once} changes.
11291133
*/
1134+
public void onDefaultMaxMergeAtOnceChanged(int defaultMaxMergeAtOnce) {
1135+
indexSettings.setMaxMergesAtOnce(defaultMaxMergeAtOnce);
1136+
}
1137+
11301138
public void onRefreshIntervalChange() {
11311139
if (refreshTask.getInterval().equals(getRefreshInterval())) {
11321140
return;

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,6 +1250,15 @@ private void setRefreshInterval(TimeValue timeValue) {
12501250
this.refreshInterval = timeValue;
12511251
}
12521252

1253+
/**
1254+
* Update the maxMergesAtOnce if no index level override exists in index level settings
1255+
*/
1256+
void setMaxMergesAtOnce(int newMaxMergesAtOnce) {
1257+
if (TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.exists(getSettings()) == false) {
1258+
tieredMergePolicyProvider.setMaxMergesAtOnce(newMaxMergesAtOnce);
1259+
}
1260+
}
1261+
12531262
/**
12541263
* Returns the settings for this index. These settings contain the node and index level settings where
12551264
* settings that are specified on both index and node level are overwritten by the index settings.

server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ public final class TieredMergePolicyProvider implements MergePolicyProvider {
143143
*/
144144
public static final ByteSizeValue DEFAULT_FLOOR_SEGMENT = new ByteSizeValue(16, ByteSizeUnit.MB);
145145

146-
public static final int DEFAULT_MAX_MERGE_AT_ONCE = 10;
146+
public static final int MIN_DEFAULT_MAX_MERGE_AT_ONCE = 2;
147+
public static final int DEFAULT_MAX_MERGE_AT_ONCE = 30;
148+
147149
public static final ByteSizeValue DEFAULT_MAX_MERGED_SEGMENT = new ByteSizeValue(5, ByteSizeUnit.GB);
148150
public static final double DEFAULT_SEGMENTS_PER_TIER = 10.0d;
149151
public static final double DEFAULT_RECLAIM_DELETES_WEIGHT = 2.0d;
@@ -173,7 +175,7 @@ public final class TieredMergePolicyProvider implements MergePolicyProvider {
173175
public static final Setting<Integer> INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING = Setting.intSetting(
174176
"index.merge.policy.max_merge_at_once",
175177
DEFAULT_MAX_MERGE_AT_ONCE,
176-
2,
178+
MIN_DEFAULT_MAX_MERGE_AT_ONCE,
177179
Property.Dynamic,
178180
Property.IndexScope
179181
);
@@ -225,7 +227,7 @@ public final class TieredMergePolicyProvider implements MergePolicyProvider {
225227
INDEX_MERGE_ENABLED
226228
);
227229
}
228-
maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier);
230+
// maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier);
229231
tieredMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING));
230232
tieredMergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
231233
tieredMergePolicy.setFloorSegmentMB(floorSegment.getMbFrac());

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@
209209
import static org.opensearch.core.common.util.CollectionUtils.arrayAsArrayList;
210210
import static org.opensearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
211211
import static org.opensearch.index.IndexService.IndexCreationContext.METADATA_VERIFICATION;
212+
import static org.opensearch.index.TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE;
213+
import static org.opensearch.index.TieredMergePolicyProvider.MIN_DEFAULT_MAX_MERGE_AT_ONCE;
212214
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
213215
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING;
214216
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
@@ -275,6 +277,21 @@ public class IndicesService extends AbstractLifecycleComponent
275277
Property.Dynamic
276278
);
277279

280+
/**
281+
* This setting is used to set the maxMergeAtOnce parameter for {@code TieredMergePolicy}
282+
* when the {@code index.merge.policy.max_merge_at_once} index setting is not provided during index creation
283+
* or when the existing {@code index.merge.policy.max_merge_at_once} index setting is set as null.
284+
* This comes handy when the user wants to change the maxMergeAtOnce across all indexes created in a cluster
285+
* which is different from the default.
286+
*/
287+
public static final Setting<Integer> CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING = Setting.intSetting(
288+
"cluster.default.index.max_merge_at_once",
289+
DEFAULT_MAX_MERGE_AT_ONCE,
290+
MIN_DEFAULT_MAX_MERGE_AT_ONCE,
291+
Property.NodeScope,
292+
Property.Dynamic
293+
);
294+
278295
/**
279296
* This setting is used to set the minimum refresh interval applicable for all indexes in a cluster. The
280297
* {@code cluster.default.index.refresh_interval} setting value needs to be higher than this setting's value. Index
@@ -369,6 +386,7 @@ public class IndicesService extends AbstractLifecycleComponent
369386
private final Consumer<IndexShard> replicator;
370387
private final Function<ShardId, ReplicationStats> segmentReplicationStatsProvider;
371388
private volatile int maxSizeInRequestCache;
389+
private volatile int defaultMaxMergeAtOnce;
372390

373391
@Override
374392
protected void doStart() {
@@ -523,6 +541,10 @@ protected void closeInternal() {
523541
this.maxSizeInRequestCache = INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING.get(clusterService.getSettings());
524542
clusterService.getClusterSettings()
525543
.addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_MAX_SIZE_ALLOWED_IN_CACHE_SETTING, this::setMaxSizeInRequestCache);
544+
545+
this.defaultMaxMergeAtOnce = CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING.get(clusterService.getSettings());
546+
clusterService.getClusterSettings()
547+
.addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING, this::onDefaultMaxMergeAtOnceUpdate);
526548
}
527549

528550
public IndicesService(
@@ -607,6 +629,22 @@ private void onRefreshIntervalUpdate(TimeValue clusterDefaultRefreshInterval) {
607629
}
608630
}
609631

632+
/**
633+
* The changes to dynamic cluster setting {@code cluster.default.index.max_merge_at_once} needs to be updated. This
634+
* method gets called whenever the setting changes. We set the instance variable with the updated value as this is
635+
* also a supplier to all IndexService that have been created on the node. We also notify the change to all
636+
* IndexService instances that are created on this node.
637+
*
638+
* @param newDefaultMaxMergeAtOnce the updated cluster default maxMergeAtOnce.
639+
*/
640+
private void onDefaultMaxMergeAtOnceUpdate(int newDefaultMaxMergeAtOnce) {
641+
this.defaultMaxMergeAtOnce = newDefaultMaxMergeAtOnce; // do we need this?
642+
for (Map.Entry<String, IndexService> entry : indices.entrySet()) {
643+
IndexService indexService = entry.getValue();
644+
indexService.onDefaultMaxMergeAtOnceChanged(newDefaultMaxMergeAtOnce);
645+
}
646+
}
647+
610648
private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTranslogFactorySupplier(
611649
Supplier<RepositoriesService> repositoriesServiceSupplier,
612650
ThreadPool threadPool,
@@ -1009,7 +1047,8 @@ private synchronized IndexService createIndexService(
10091047
this.recoverySettings,
10101048
this.remoteStoreSettings,
10111049
replicator,
1012-
segmentReplicationStatsProvider
1050+
segmentReplicationStatsProvider,
1051+
this::getClusterDefaultMaxMergeAtOnce
10131052
);
10141053
}
10151054

@@ -2155,6 +2194,10 @@ private TimeValue getClusterDefaultRefreshInterval() {
21552194
return this.clusterDefaultRefreshInterval;
21562195
}
21572196

2197+
private Integer getClusterDefaultMaxMergeAtOnce() {
2198+
return this.defaultMaxMergeAtOnce;
2199+
}
2200+
21582201
public RemoteStoreSettings getRemoteStoreSettings() {
21592202
return this.remoteStoreSettings;
21602203
}

server/src/test/java/org/opensearch/index/IndexModuleTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,8 @@ private IndexService newIndexService(IndexModule module) throws IOException {
266266
DefaultRecoverySettings.INSTANCE,
267267
DefaultRemoteStoreSettings.INSTANCE,
268268
s -> {},
269-
null
269+
null,
270+
() -> TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE
270271
);
271272
}
272273

0 commit comments

Comments
 (0)