Skip to content

Commit 23d6e8c

Browse files
committed
Make index.merge_on_flush.enabled, index.merge_on_flush.max_full_flush_merge_wait_time, index.merge_on_flush.policy, index.check_pending_flush.enabled dynamic
Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]>
1 parent 370cd8c commit 23d6e8c

File tree

4 files changed

+119
-8
lines changed

4 files changed

+119
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6262
- Null check field names in QueryStringQueryBuilder ([#18194](https://github.com/opensearch-project/OpenSearch/pull/18194))
6363
- Avoid NPE if on SnapshotInfo if 'shallow' boolean not present ([#18187](https://github.com/opensearch-project/OpenSearch/issues/18187))
6464
- Fix 'system call filter not installed' caused when network.host: 0.0.0.0 ([#18309](https://github.com/opensearch-project/OpenSearch/pull/18309))
65+
- Make `index.merge_on_flush.enabled`, `index.merge_on_flush.max_full_flush_merge_wait_time`, `index.merge_on_flush.policy`, `index.check_pending_flush.enabled` dynamic ([#17495](https://github.com/opensearch-project/OpenSearch/pull/17495))
6566

6667
### Security
6768

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@
9595
*/
9696
@PublicApi(since = "1.0.0")
9797
public final class IndexSettings {
98-
private static final String DEFAULT_POLICY = "default";
99-
private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush";
98+
public static final String DEFAULT_POLICY = "default";
99+
public static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush";
100100

101101
/**
102102
* Enum representing supported merge policies
@@ -649,7 +649,8 @@ public static IndexMergePolicy fromString(String text) {
649649
public static final Setting<Boolean> INDEX_CHECK_PENDING_FLUSH_ENABLED = Setting.boolSetting(
650650
"index.check_pending_flush.enabled",
651651
true,
652-
Property.IndexScope
652+
Property.IndexScope,
653+
Property.Dynamic
653654
);
654655

655656
public static final Setting<String> TIME_SERIES_INDEX_MERGE_POLICY = Setting.simpleString(
@@ -903,7 +904,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
903904
/**
904905
* Is flush check by write threads enabled or not
905906
*/
906-
private final boolean checkPendingFlushEnabled;
907+
private volatile boolean checkPendingFlushEnabled;
907908
/**
908909
* Is fuzzy set enabled for doc id
909910
*/
@@ -1201,6 +1202,11 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
12011202
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
12021203
this::setRemoteStoreTranslogRepository
12031204
);
1205+
scopedSettings.addSettingsUpdateConsumer(INDEX_CHECK_PENDING_FLUSH_ENABLED, this::setCheckPendingFlushEnabled);
1206+
}
1207+
1208+
public void setCheckPendingFlushEnabled(boolean checkPendingFlushEnabled) {
1209+
this.checkPendingFlushEnabled = checkPendingFlushEnabled;
12041210
}
12051211

12061212
private void setSearchIdleAfter(TimeValue searchIdleAfter) {

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ public class InternalEngine extends Engine {
172172
protected final String historyUUID;
173173

174174
private final OpenSearchConcurrentMergeScheduler mergeScheduler;
175+
private MergePolicy noMergeOnFlushPolicy;
175176
private final ExternalReaderManager externalReaderManager;
176177
private final OpenSearchReaderManager internalReaderManager;
177178

@@ -2337,26 +2338,27 @@ private IndexWriterConfig getIndexWriterConfig() {
23372338
iwc.setMergeScheduler(mergeScheduler);
23382339
// Give us the opportunity to upgrade old segments while performing
23392340
// background merges
2340-
MergePolicy mergePolicy = config().getMergePolicy();
2341+
this.noMergeOnFlushPolicy = config().getMergePolicy();
23412342
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
23422343
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
2343-
mergePolicy = new RecoverySourcePruneMergePolicy(
2344+
noMergeOnFlushPolicy = new RecoverySourcePruneMergePolicy(
23442345
SourceFieldMapper.RECOVERY_SOURCE_NAME,
23452346
softDeletesPolicy::getRetentionQuery,
23462347
new SoftDeletesRetentionMergePolicy(
23472348
Lucene.SOFT_DELETES_FIELD,
23482349
softDeletesPolicy::getRetentionQuery,
2349-
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)
2350+
new PrunePostingsMergePolicy(noMergeOnFlushPolicy, IdFieldMapper.NAME)
23502351
)
23512352
);
23522353
boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString()));
23532354
if (shuffleForcedMerge) {
23542355
// We wrap the merge policy for all indices even though it is mostly useful for time-based indices
23552356
// but there should be no overhead for other type of indices so it's simpler than adding a setting
23562357
// to enable it.
2357-
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
2358+
noMergeOnFlushPolicy = new ShuffleForcedMergePolicy(noMergeOnFlushPolicy);
23582359
}
23592360

2361+
MergePolicy mergePolicy = noMergeOnFlushPolicy;
23602362
if (config().getIndexSettings().isMergeOnFlushEnabled()) {
23612363
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
23622364
if (maxFullFlushMergeWaitMillis > 0) {
@@ -2610,6 +2612,33 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
26102612
// the setting will be re-interpreted if it's set to true
26112613
updateAutoIdTimestamp(Long.MAX_VALUE, true);
26122614
}
2615+
IndexSettings indexSettings = engineConfig.getIndexSettings();
2616+
// In InternalEngine, indexWriter.getConfig() must be a IndexWriterConfig instance.
2617+
IndexWriterConfig indexWriterConfig = (IndexWriterConfig) indexWriter.getConfig();
2618+
if (indexSettings.isCheckPendingFlushEnabled() != indexWriterConfig.isCheckPendingFlushOnUpdate()) {
2619+
indexWriterConfig.setCheckPendingFlushUpdate(indexSettings.isCheckPendingFlushEnabled());
2620+
}
2621+
if (indexSettings.isMergeOnFlushEnabled()) {
2622+
final long maxFullFlushMergeWaitMillis = indexSettings.getMaxFullFlushMergeWaitTime().millis();
2623+
if (maxFullFlushMergeWaitMillis > 0) {
2624+
indexWriterConfig.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
2625+
final Optional<UnaryOperator<MergePolicy>> mergeOnFlushPolicy = indexSettings.getMergeOnFlushPolicy();
2626+
if (mergeOnFlushPolicy.isPresent()) {
2627+
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(mergeOnFlushPolicy.get().apply(noMergeOnFlushPolicy)));
2628+
} else {
2629+
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(noMergeOnFlushPolicy));
2630+
}
2631+
} else {
2632+
logger.warn(
2633+
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
2634+
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
2635+
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
2636+
);
2637+
}
2638+
} else {
2639+
indexWriterConfig.setMaxFullFlushMergeWaitMillis(0);
2640+
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(noMergeOnFlushPolicy));
2641+
}
26132642
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy();
26142643
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
26152644
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());

server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.lucene.index.Terms;
6868
import org.apache.lucene.index.TermsEnum;
6969
import org.apache.lucene.index.TieredMergePolicy;
70+
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
7071
import org.apache.lucene.search.IndexSearcher;
7172
import org.apache.lucene.search.MatchAllDocsQuery;
7273
import org.apache.lucene.search.ReferenceManager;
@@ -142,6 +143,7 @@
142143
import org.opensearch.index.seqno.RetentionLeases;
143144
import org.opensearch.index.seqno.SeqNoStats;
144145
import org.opensearch.index.seqno.SequenceNumbers;
146+
import org.opensearch.index.shard.OpenSearchMergePolicy;
145147
import org.opensearch.index.shard.ShardUtils;
146148
import org.opensearch.index.store.Store;
147149
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
@@ -201,6 +203,8 @@
201203
import java.util.stream.LongStream;
202204

203205
import static java.util.Collections.shuffle;
206+
import static org.opensearch.index.IndexSettings.DEFAULT_POLICY;
207+
import static org.opensearch.index.IndexSettings.MERGE_ON_FLUSH_MERGE_POLICY;
204208
import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET;
205209
import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY;
206210
import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
@@ -6604,6 +6608,77 @@ public void testStressShouldPeriodicallyFlush() throws Exception {
66046608
}
66056609
}
66066610

6611+
public void testMultiSettingsDynamicForMerge() {
6612+
boolean checkPendingFlushEnabled = true;
6613+
boolean mergeOnFlushEnabled = true;
6614+
TimeValue maxFullFlushMergeWaitTime = TimeValue.timeValueSeconds(1);
6615+
String indexMergeOnPlushPolicy = MERGE_ON_FLUSH_MERGE_POLICY;
6616+
final IndexSettings indexSettings = engine.config().getIndexSettings();
6617+
IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
6618+
.settings(
6619+
Settings.builder()
6620+
.put(indexSettings.getSettings())
6621+
.put(IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED.getKey(), checkPendingFlushEnabled)
6622+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
6623+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
6624+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), indexMergeOnPlushPolicy)
6625+
)
6626+
.build();
6627+
indexSettings.updateIndexMetadata(indexMetadata);
6628+
engine.onSettingsChanged(
6629+
indexSettings.getTranslogRetentionAge(),
6630+
indexSettings.getTranslogRetentionSize(),
6631+
indexSettings.getSoftDeleteRetentionOperations()
6632+
);
6633+
assertEquals(checkPendingFlushEnabled, engine.getCurrentIndexWriterConfig().isCheckPendingFlushOnUpdate());
6634+
assertEquals(maxFullFlushMergeWaitTime.millis(), engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
6635+
MergePolicy mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
6636+
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
6637+
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof MergeOnFlushMergePolicy);
6638+
6639+
indexMergeOnPlushPolicy = DEFAULT_POLICY;
6640+
indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
6641+
.settings(
6642+
Settings.builder()
6643+
.put(indexSettings.getSettings())
6644+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), indexMergeOnPlushPolicy)
6645+
)
6646+
.build();
6647+
indexSettings.updateIndexMetadata(indexMetadata);
6648+
engine.onSettingsChanged(
6649+
indexSettings.getTranslogRetentionAge(),
6650+
indexSettings.getTranslogRetentionSize(),
6651+
indexSettings.getSoftDeleteRetentionOperations()
6652+
);
6653+
mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
6654+
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
6655+
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof ShuffleForcedMergePolicy);
6656+
6657+
mergeOnFlushEnabled = false;
6658+
checkPendingFlushEnabled = false;
6659+
indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
6660+
.settings(
6661+
Settings.builder()
6662+
.put(indexSettings.getSettings())
6663+
.put(IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED.getKey(), checkPendingFlushEnabled)
6664+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
6665+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
6666+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush")
6667+
)
6668+
.build();
6669+
indexSettings.updateIndexMetadata(indexMetadata);
6670+
engine.onSettingsChanged(
6671+
indexSettings.getTranslogRetentionAge(),
6672+
indexSettings.getTranslogRetentionSize(),
6673+
indexSettings.getSoftDeleteRetentionOperations()
6674+
);
6675+
assertEquals(checkPendingFlushEnabled, engine.getCurrentIndexWriterConfig().isCheckPendingFlushOnUpdate());
6676+
assertEquals(0, engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
6677+
mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
6678+
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
6679+
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof ShuffleForcedMergePolicy);
6680+
}
6681+
66076682
public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
66086683
final int iters = randomIntBetween(1, 15);
66096684
for (int i = 0; i < iters; i++) {

0 commit comments

Comments
 (0)