Skip to content

Commit cb4b0f1

Browse files
committed
Make index.merge_on_flush.enabled, index.merge_on_flush.max_full_flush_merge_wait_time, index.merge_on_flush.policy, index.check_pending_flush.enabled dynamic
Signed-off-by: kkewwei <[email protected]> Signed-off-by: kkewwei <[email protected]>
1 parent 9a3fc30 commit cb4b0f1

File tree

4 files changed

+121
-11
lines changed

4 files changed

+121
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2929
- With creation of FilterFieldType, we need unwrap all the MappedFieldType before using the instanceof check. ([#17951](https://github.com/opensearch-project/OpenSearch/pull/17951))
3030
- Fix simultaneously creating a snapshot and updating the repository can potentially trigger an infinite loop ([#17532](https://github.com/opensearch-project/OpenSearch/pull/17532))
3131
- Remove package org.opensearch.transport.grpc and replace with org.opensearch.plugin.transport.grpc ([#18031](https://github.com/opensearch-project/OpenSearch/pull/18031))
32+
- Make `index.merge_on_flush.enabled`, `index.merge_on_flush.max_full_flush_merge_wait_time`, `index.merge_on_flush.policy`, `index.check_pending_flush.enabled` dynamic ([#17495](https://github.com/opensearch-project/OpenSearch/pull/17495))
3233

3334
### Security
3435

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@
9494
*/
9595
@PublicApi(since = "1.0.0")
9696
public final class IndexSettings {
97-
private static final String DEFAULT_POLICY = "default";
98-
private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush";
97+
public static final String DEFAULT_POLICY = "default";
98+
public static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush";
9999

100100
/**
101101
* Enum representing supported merge policies
@@ -648,7 +648,8 @@ public static IndexMergePolicy fromString(String text) {
648648
public static final Setting<Boolean> INDEX_CHECK_PENDING_FLUSH_ENABLED = Setting.boolSetting(
649649
"index.check_pending_flush.enabled",
650650
true,
651-
Property.IndexScope
651+
Property.IndexScope,
652+
Property.Dynamic
652653
);
653654

654655
public static final Setting<String> TIME_SERIES_INDEX_MERGE_POLICY = Setting.simpleString(
@@ -902,7 +903,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
902903
/**
903904
* Is flush check by write threads enabled or not
904905
*/
905-
private final boolean checkPendingFlushEnabled;
906+
private volatile boolean checkPendingFlushEnabled;
906907
/**
907908
* Is fuzzy set enabled for doc id
908909
*/
@@ -1200,6 +1201,11 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
12001201
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
12011202
this::setRemoteStoreTranslogRepository
12021203
);
1204+
scopedSettings.addSettingsUpdateConsumer(INDEX_CHECK_PENDING_FLUSH_ENABLED, this::setCheckPendingFlushEnabled);
1205+
}
1206+
1207+
public void setCheckPendingFlushEnabled(boolean checkPendingFlushEnabled) {
1208+
this.checkPendingFlushEnabled = checkPendingFlushEnabled;
12031209
}
12041210

12051211
private void setSearchIdleAfter(TimeValue searchIdleAfter) {

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public class InternalEngine extends Engine {
157157
protected volatile long lastDeleteVersionPruneTimeMSec;
158158

159159
protected final TranslogManager translogManager;
160+
private final IndexWriterConfig indexWriterConfig;
160161
protected final IndexWriter indexWriter;
161162
protected final LocalCheckpointTracker localCheckpointTracker;
162163
protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
@@ -172,6 +173,7 @@ public class InternalEngine extends Engine {
172173
protected final String historyUUID;
173174

174175
private final OpenSearchConcurrentMergeScheduler mergeScheduler;
176+
private MergePolicy noMergeOnFlushPolicy;
175177
private final ExternalReaderManager externalReaderManager;
176178
private final OpenSearchReaderManager internalReaderManager;
177179

@@ -293,7 +295,8 @@ public void onFailure(String reason, Exception ex) {
293295
translogManager::getLastSyncedGlobalCheckpoint
294296
);
295297
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
296-
writer = createWriter();
298+
this.indexWriterConfig = getIndexWriterConfig();
299+
writer = createWriter(indexWriterConfig);
297300
bootstrapAppendOnlyInfoFromWriter(writer);
298301
final Map<String, String> commitData = commitDataAsMap(writer);
299302
historyUUID = loadHistoryUUID(commitData);
@@ -2304,9 +2307,8 @@ protected final ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(
23042307
}
23052308
}
23062309

2307-
private IndexWriter createWriter() throws IOException {
2310+
private IndexWriter createWriter(IndexWriterConfig iwc) throws IOException {
23082311
try {
2309-
final IndexWriterConfig iwc = getIndexWriterConfig();
23102312
return createWriter(store.directory(), iwc);
23112313
} catch (LockObtainFailedException ex) {
23122314
logger.warn("could not lock IndexWriter", ex);
@@ -2337,26 +2339,27 @@ private IndexWriterConfig getIndexWriterConfig() {
23372339
iwc.setMergeScheduler(mergeScheduler);
23382340
// Give us the opportunity to upgrade old segments while performing
23392341
// background merges
2340-
MergePolicy mergePolicy = config().getMergePolicy();
2342+
this.noMergeOnFlushPolicy = config().getMergePolicy();
23412343
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
23422344
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
2343-
mergePolicy = new RecoverySourcePruneMergePolicy(
2345+
noMergeOnFlushPolicy = new RecoverySourcePruneMergePolicy(
23442346
SourceFieldMapper.RECOVERY_SOURCE_NAME,
23452347
softDeletesPolicy::getRetentionQuery,
23462348
new SoftDeletesRetentionMergePolicy(
23472349
Lucene.SOFT_DELETES_FIELD,
23482350
softDeletesPolicy::getRetentionQuery,
2349-
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)
2351+
new PrunePostingsMergePolicy(noMergeOnFlushPolicy, IdFieldMapper.NAME)
23502352
)
23512353
);
23522354
boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString()));
23532355
if (shuffleForcedMerge) {
23542356
// We wrap the merge policy for all indices even though it is mostly useful for time-based indices
23552357
// but there should be no overhead for other type of indices so it's simpler than adding a setting
23562358
// to enable it.
2357-
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
2359+
noMergeOnFlushPolicy = new ShuffleForcedMergePolicy(noMergeOnFlushPolicy);
23582360
}
23592361

2362+
MergePolicy mergePolicy = noMergeOnFlushPolicy;
23602363
if (config().getIndexSettings().isMergeOnFlushEnabled()) {
23612364
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
23622365
if (maxFullFlushMergeWaitMillis > 0) {
@@ -2610,6 +2613,31 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran
26102613
// the setting will be re-interpreted if it's set to true
26112614
updateAutoIdTimestamp(Long.MAX_VALUE, true);
26122615
}
2616+
IndexSettings indexSettings = engineConfig.getIndexSettings();
2617+
if (indexSettings.isCheckPendingFlushEnabled() != indexWriterConfig.isCheckPendingFlushOnUpdate()) {
2618+
indexWriterConfig.setCheckPendingFlushUpdate(indexSettings.isCheckPendingFlushEnabled());
2619+
}
2620+
if (indexSettings.isMergeOnFlushEnabled()) {
2621+
final long maxFullFlushMergeWaitMillis = indexSettings.getMaxFullFlushMergeWaitTime().millis();
2622+
if (maxFullFlushMergeWaitMillis > 0) {
2623+
indexWriterConfig.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
2624+
final Optional<UnaryOperator<MergePolicy>> mergeOnFlushPolicy = indexSettings.getMergeOnFlushPolicy();
2625+
if (mergeOnFlushPolicy.isPresent()) {
2626+
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(mergeOnFlushPolicy.get().apply(noMergeOnFlushPolicy)));
2627+
} else {
2628+
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(noMergeOnFlushPolicy));
2629+
}
2630+
} else {
2631+
logger.warn(
2632+
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
2633+
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
2634+
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
2635+
);
2636+
}
2637+
} else {
2638+
indexWriterConfig.setMaxFullFlushMergeWaitMillis(0);
2639+
indexWriterConfig.setMergePolicy(new OpenSearchMergePolicy(noMergeOnFlushPolicy));
2640+
}
26132641
final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy();
26142642
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
26152643
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());

server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.lucene.index.Terms;
6868
import org.apache.lucene.index.TermsEnum;
6969
import org.apache.lucene.index.TieredMergePolicy;
70+
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
7071
import org.apache.lucene.search.IndexSearcher;
7172
import org.apache.lucene.search.MatchAllDocsQuery;
7273
import org.apache.lucene.search.ReferenceManager;
@@ -142,6 +143,7 @@
142143
import org.opensearch.index.seqno.RetentionLeases;
143144
import org.opensearch.index.seqno.SeqNoStats;
144145
import org.opensearch.index.seqno.SequenceNumbers;
146+
import org.opensearch.index.shard.OpenSearchMergePolicy;
145147
import org.opensearch.index.shard.ShardUtils;
146148
import org.opensearch.index.store.Store;
147149
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
@@ -201,6 +203,8 @@
201203
import java.util.stream.LongStream;
202204

203205
import static java.util.Collections.shuffle;
206+
import static org.opensearch.index.IndexSettings.DEFAULT_POLICY;
207+
import static org.opensearch.index.IndexSettings.MERGE_ON_FLUSH_MERGE_POLICY;
204208
import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_RESET;
205209
import static org.opensearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY;
206210
import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
@@ -6604,6 +6608,77 @@ public void testStressShouldPeriodicallyFlush() throws Exception {
66046608
}
66056609
}
66066610

6611+
public void testMultiSettingsDynamicForMerge() {
6612+
boolean checkPendingFlushEnabled = true;
6613+
boolean mergeOnFlushEnabled = true;
6614+
TimeValue maxFullFlushMergeWaitTime = TimeValue.timeValueSeconds(1);
6615+
String indexMergeOnPlushPolicy = MERGE_ON_FLUSH_MERGE_POLICY;
6616+
final IndexSettings indexSettings = engine.config().getIndexSettings();
6617+
IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
6618+
.settings(
6619+
Settings.builder()
6620+
.put(indexSettings.getSettings())
6621+
.put(IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED.getKey(), checkPendingFlushEnabled)
6622+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
6623+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
6624+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), indexMergeOnPlushPolicy)
6625+
)
6626+
.build();
6627+
indexSettings.updateIndexMetadata(indexMetadata);
6628+
engine.onSettingsChanged(
6629+
indexSettings.getTranslogRetentionAge(),
6630+
indexSettings.getTranslogRetentionSize(),
6631+
indexSettings.getSoftDeleteRetentionOperations()
6632+
);
6633+
assertEquals(checkPendingFlushEnabled, engine.getCurrentIndexWriterConfig().isCheckPendingFlushOnUpdate());
6634+
assertEquals(maxFullFlushMergeWaitTime.millis(), engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
6635+
MergePolicy mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
6636+
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
6637+
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof MergeOnFlushMergePolicy);
6638+
6639+
indexMergeOnPlushPolicy = DEFAULT_POLICY;
6640+
indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
6641+
.settings(
6642+
Settings.builder()
6643+
.put(indexSettings.getSettings())
6644+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), indexMergeOnPlushPolicy)
6645+
)
6646+
.build();
6647+
indexSettings.updateIndexMetadata(indexMetadata);
6648+
engine.onSettingsChanged(
6649+
indexSettings.getTranslogRetentionAge(),
6650+
indexSettings.getTranslogRetentionSize(),
6651+
indexSettings.getSoftDeleteRetentionOperations()
6652+
);
6653+
mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
6654+
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
6655+
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof ShuffleForcedMergePolicy);
6656+
6657+
mergeOnFlushEnabled = false;
6658+
checkPendingFlushEnabled = false;
6659+
indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata())
6660+
.settings(
6661+
Settings.builder()
6662+
.put(indexSettings.getSettings())
6663+
.put(IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED.getKey(), checkPendingFlushEnabled)
6664+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), mergeOnFlushEnabled)
6665+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), maxFullFlushMergeWaitTime)
6666+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush")
6667+
)
6668+
.build();
6669+
indexSettings.updateIndexMetadata(indexMetadata);
6670+
engine.onSettingsChanged(
6671+
indexSettings.getTranslogRetentionAge(),
6672+
indexSettings.getTranslogRetentionSize(),
6673+
indexSettings.getSoftDeleteRetentionOperations()
6674+
);
6675+
assertEquals(checkPendingFlushEnabled, engine.getCurrentIndexWriterConfig().isCheckPendingFlushOnUpdate());
6676+
assertEquals(0, engine.getCurrentIndexWriterConfig().getMaxFullFlushMergeWaitMillis());
6677+
mergePolicy = engine.getCurrentIndexWriterConfig().getMergePolicy();
6678+
assertTrue(mergePolicy instanceof OpenSearchMergePolicy);
6679+
assertTrue(((OpenSearchMergePolicy) mergePolicy).getDelegate() instanceof ShuffleForcedMergePolicy);
6680+
}
6681+
66076682
public void testStressUpdateSameDocWhileGettingIt() throws IOException, InterruptedException {
66086683
final int iters = randomIntBetween(1, 15);
66096684
for (int i = 0; i < iters; i++) {

0 commit comments

Comments
 (0)