Skip to content

Commit f282a91

Browse files
guojialiang92vinaykpud
authored andcommitted
[merged segment warmer] support cleanup redundant pending merged segments (opensearch-project#18720)
* support cleanup redundant pending merged segments Signed-off-by: guojialiang <[email protected]> * fix ut Signed-off-by: guojialiang <[email protected]> * add test Signed-off-by: guojialiang <[email protected]> * add testPublishReferencedSegmentsTask Signed-off-by: guojialiang <[email protected]> * add clean pending merge segment unit test to SegmentReplicationIndexShardTests Signed-off-by: guojialiang <[email protected]> * update Signed-off-by: guojialiang <[email protected]> * modify license Signed-off-by: guojialiang <[email protected]> * only updatePublishReferencedSegmentsTask if the publishReferencedSegmentsInterval has changed Signed-off-by: guojialiang <[email protected]> * add log Signed-off-by: guojialiang <[email protected]> * add note Signed-off-by: guojialiang <[email protected]> * filter .si files Signed-off-by: guojialiang <[email protected]> * add assert Signed-off-by: guojialiang <[email protected]> * add string constant Signed-off-by: guojialiang <[email protected]> * add LatchedActionListener to testPublishReferencedSegmentsOnPrimary Signed-off-by: guojialiang <[email protected]> * refactor code Signed-off-by: guojialiang <[email protected]> * refactor code Signed-off-by: guojialiang <[email protected]> * fix ut Signed-off-by: guojialiang <[email protected]> --------- Signed-off-by: guojialiang <[email protected]>
1 parent 2718266 commit f282a91

34 files changed

+1169
-68
lines changed

server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.opensearch.indices.IndicesService;
9090
import org.opensearch.indices.recovery.RecoveryState;
9191
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
92+
import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsPublisher;
9293
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
9394
import org.opensearch.plugins.Plugin;
9495
import org.opensearch.search.builder.SearchSourceBuilder;
@@ -731,7 +732,8 @@ public static final IndexShard newIndexShard(
731732
() -> indexService.getIndexSettings().getRefreshInterval(),
732733
indexService.getRefreshMutex(),
733734
clusterService.getClusterApplierService(),
734-
MergedSegmentPublisher.EMPTY
735+
MergedSegmentPublisher.EMPTY,
736+
ReferencedSegmentsPublisher.EMPTY
735737
);
736738
}
737739

server/src/internalClusterTest/java/org/opensearch/indices/replication/MergedSegmentWarmerIT.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,28 +9,35 @@
99
package org.opensearch.indices.replication;
1010

1111
import org.apache.logging.log4j.Logger;
12+
import org.apache.lucene.store.Directory;
1213
import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest;
1314
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
1415
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
1516
import org.opensearch.action.admin.indices.segments.ShardSegments;
1617
import org.opensearch.action.support.WriteRequest;
1718
import org.opensearch.common.settings.Settings;
19+
import org.opensearch.common.unit.TimeValue;
1820
import org.opensearch.common.util.FeatureFlags;
1921
import org.opensearch.common.util.set.Sets;
2022
import org.opensearch.index.IndexSettings;
2123
import org.opensearch.index.TieredMergePolicyProvider;
2224
import org.opensearch.index.engine.Segment;
25+
import org.opensearch.index.shard.IndexShard;
2326
import org.opensearch.index.store.StoreFileMetadata;
2427
import org.opensearch.test.OpenSearchIntegTestCase;
2528
import org.opensearch.test.transport.MockTransportService;
29+
import org.opensearch.transport.ConnectTransportException;
2630
import org.opensearch.transport.TransportService;
2731

2832
import java.util.Set;
2933
import java.util.concurrent.CountDownLatch;
3034
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3136
import java.util.concurrent.atomic.AtomicInteger;
3237
import java.util.stream.Collectors;
3338

39+
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
40+
3441
/**
3542
* This class runs Segment Replication Integ test suite with merged segment warmer enabled.
3643
*/
@@ -175,6 +182,87 @@ public void testMergeSegmentWarmerWithInactiveReplica() throws Exception {
175182
assertEquals(1, response.getIndices().get(INDEX_NAME).getShards().values().size());
176183
}
177184

185+
// Construct a case with redundant pending merge segments in replica shard, and finally delete these files
186+
public void testCleanupRedundantPendingMergeFile() throws Exception {
187+
final String primaryNode = internalCluster().startDataOnlyNode();
188+
createIndex(
189+
INDEX_NAME,
190+
Settings.builder()
191+
.put(indexSettings())
192+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 5)
193+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 5)
194+
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false)
195+
.build()
196+
);
197+
ensureYellowAndNoInitializingShards(INDEX_NAME);
198+
final String replicaNode = internalCluster().startDataOnlyNode();
199+
ensureGreen(INDEX_NAME);
200+
201+
AtomicBoolean forceMergeComplete = new AtomicBoolean(false);
202+
MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
203+
TransportService.class,
204+
primaryNode
205+
));
206+
207+
primaryTransportService.addSendBehavior(
208+
internalCluster().getInstance(TransportService.class, replicaNode),
209+
(connection, requestId, action, request, options) -> {
210+
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK)) {
211+
if (forceMergeComplete.get() == false) {
212+
logger.trace("mock connection exception");
213+
throw new ConnectTransportException(connection.getNode(), "mock connection exception");
214+
}
215+
216+
}
217+
connection.sendRequest(requestId, action, request, options);
218+
}
219+
);
220+
221+
for (int i = 0; i < 30; i++) {
222+
client().prepareIndex(INDEX_NAME)
223+
.setId(String.valueOf(i))
224+
.setSource("foo" + i, "bar" + i)
225+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
226+
.get();
227+
}
228+
229+
IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
230+
assertBusy(() -> assertFalse(replicaShard.getPendingMergedSegmentCheckpoints().isEmpty()));
231+
232+
client().admin().indices().forceMerge(new ForceMergeRequest(INDEX_NAME).maxNumSegments(1)).get();
233+
forceMergeComplete.set(true);
234+
235+
// Verify replica shard has pending merged segments
236+
assertBusy(() -> { assertFalse(replicaShard.getPendingMergedSegmentCheckpoints().isEmpty()); }, 1, TimeUnit.MINUTES);
237+
238+
waitForSegmentCount(INDEX_NAME, 1, logger);
239+
primaryTransportService.clearAllRules();
240+
241+
assertAcked(
242+
client().admin()
243+
.indices()
244+
.prepareUpdateSettings(INDEX_NAME)
245+
.setSettings(
246+
Settings.builder()
247+
.put(IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1))
248+
)
249+
);
250+
251+
assertBusy(() -> {
252+
IndexShard primaryShard = getIndexShard(primaryNode, INDEX_NAME);
253+
Directory primaryDirectory = primaryShard.store().directory();
254+
Set<String> primaryFiles = Sets.newHashSet(primaryDirectory.listAll());
255+
primaryFiles.removeIf(f -> f.startsWith("segment"));
256+
Directory replicaDirectory = replicaShard.store().directory();
257+
Set<String> replicaFiles = Sets.newHashSet(replicaDirectory.listAll());
258+
replicaFiles.removeIf(f -> f.startsWith("segment"));
259+
// Verify replica shard does not have pending merged segments
260+
assertEquals(0, replicaShard.getPendingMergedSegmentCheckpoints().size());
261+
// Verify that primary shard and replica shard have the same file list
262+
assertEquals(primaryFiles, replicaFiles);
263+
}, 1, TimeUnit.MINUTES);
264+
}
265+
178266
public static void waitForSegmentCount(String indexName, int segmentCount, Logger logger) throws Exception {
179267
assertBusy(() -> {
180268
Set<String> primarySegments = Sets.newHashSet();

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
159159
IndexSettings.MAX_TERMS_COUNT_SETTING,
160160
IndexSettings.MAX_NESTED_QUERY_DEPTH_SETTING,
161161
IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING,
162+
IndexSettings.INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING,
162163
IndexSettings.DEFAULT_FIELD_SETTING,
163164
IndexSettings.QUERY_STRING_LENIENT_SETTING,
164165
IndexSettings.ALLOW_UNMAPPED,

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
import org.opensearch.indices.recovery.RecoverySettings;
107107
import org.opensearch.indices.recovery.RecoveryState;
108108
import org.opensearch.indices.replication.checkpoint.MergedSegmentPublisher;
109+
import org.opensearch.indices.replication.checkpoint.ReferencedSegmentsPublisher;
109110
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
110111
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
111112
import org.opensearch.plugins.IndexStorePlugin;
@@ -178,6 +179,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
178179
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
179180
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
180181
private volatile AsyncReplicationTask asyncReplicationTask;
182+
private volatile AsyncPublishReferencedSegmentsTask asyncPublishReferencedSegmentsTask;
181183

182184
// don't convert to Setting<> and register... we only set this in tests and register via a plugin
183185
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
@@ -330,6 +332,9 @@ public IndexService(
330332
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
331333
}
332334
this.asyncReplicationTask = new AsyncReplicationTask(this);
335+
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)) {
336+
this.asyncPublishReferencedSegmentsTask = new AsyncPublishReferencedSegmentsTask(this);
337+
}
333338
this.translogFactorySupplier = translogFactorySupplier;
334339
this.recoverySettings = recoverySettings;
335340
this.remoteStoreSettings = remoteStoreSettings;
@@ -443,6 +448,11 @@ AsyncReplicationTask getReplicationTask() {
443448
return asyncReplicationTask;
444449
}
445450

451+
// visible for tests
452+
AsyncPublishReferencedSegmentsTask getPublishReferencedSegmentsTask() {
453+
return asyncPublishReferencedSegmentsTask;
454+
}
455+
446456
/**
447457
* Context for index creation
448458
*
@@ -614,6 +624,7 @@ public synchronized IndexShard createShard(
614624
sourceNode,
615625
discoveryNodes,
616626
mergedSegmentWarmerFactory,
627+
null,
617628
null
618629
);
619630
}
@@ -629,7 +640,8 @@ public synchronized IndexShard createShard(
629640
@Nullable DiscoveryNode sourceNode,
630641
DiscoveryNodes discoveryNodes,
631642
MergedSegmentWarmerFactory mergedSegmentWarmerFactory,
632-
MergedSegmentPublisher mergedSegmentPublisher
643+
MergedSegmentPublisher mergedSegmentPublisher,
644+
ReferencedSegmentsPublisher referencedSegmentsPublisher
633645
) throws IOException {
634646
Objects.requireNonNull(retentionLeaseSyncer);
635647
/*
@@ -769,7 +781,8 @@ protected void closeInternal() {
769781
this::getRefreshInterval,
770782
refreshMutex,
771783
clusterService.getClusterApplierService(),
772-
this.indexSettings.isSegRepEnabledOrRemoteNode() ? mergedSegmentPublisher : null
784+
this.indexSettings.isSegRepEnabledOrRemoteNode() ? mergedSegmentPublisher : null,
785+
this.indexSettings.isSegRepEnabledOrRemoteNode() ? referencedSegmentsPublisher : null
773786
);
774787
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
775788
eventListener.afterIndexShardCreated(indexShard);
@@ -1176,6 +1189,9 @@ public synchronized void updateMetadata(final IndexMetadata currentIndexMetadata
11761189
onRefreshIntervalChange();
11771190
updateFsyncTaskIfNecessary();
11781191
updateReplicationTask();
1192+
if (FeatureFlags.isEnabled(FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING)) {
1193+
updatePublishReferencedSegmentsTask();
1194+
}
11791195
}
11801196

11811197
metadataListeners.forEach(c -> c.accept(newIndexMetadata));
@@ -1189,6 +1205,17 @@ private void updateReplicationTask() {
11891205
}
11901206
}
11911207

1208+
private void updatePublishReferencedSegmentsTask() {
1209+
if (getIndexSettings().getPublishReferencedSegmentsInterval().equals(asyncPublishReferencedSegmentsTask.getInterval())) {
1210+
return;
1211+
}
1212+
try {
1213+
asyncPublishReferencedSegmentsTask.close();
1214+
} finally {
1215+
asyncPublishReferencedSegmentsTask = new AsyncPublishReferencedSegmentsTask(this);
1216+
}
1217+
}
1218+
11921219
/**
11931220
* Called whenever the cluster level {@code cluster.default.index.max_merge_at_once} changes.
11941221
*/
@@ -1516,6 +1543,52 @@ private void maybeSyncSegments(boolean force) {
15161543
}
15171544
}
15181545

1546+
/**
1547+
* Publish primary shard referenced segments in a defined interval.
1548+
*
1549+
* @opensearch.internal
1550+
*/
1551+
final class AsyncPublishReferencedSegmentsTask extends BaseAsyncTask {
1552+
1553+
AsyncPublishReferencedSegmentsTask(IndexService indexService) {
1554+
super(indexService, indexService.getIndexSettings().getPublishReferencedSegmentsInterval());
1555+
}
1556+
1557+
@Override
1558+
protected void runInternal() {
1559+
indexService.maybePublishReferencedSegments();
1560+
}
1561+
1562+
@Override
1563+
protected String getThreadPool() {
1564+
return ThreadPool.Names.GENERIC;
1565+
}
1566+
1567+
@Override
1568+
public String toString() {
1569+
return "publish_primary_referenced_segments";
1570+
}
1571+
1572+
@Override
1573+
protected boolean mustReschedule() {
1574+
return indexSettings.isSegRepEnabledOrRemoteNode() && super.mustReschedule();
1575+
}
1576+
}
1577+
1578+
private void maybePublishReferencedSegments() {
1579+
for (IndexShard shard : this.shards.values()) {
1580+
try {
1581+
// Only the primary shard publish referenced segments.
1582+
// The replicas cleans up the redundant pending merge segments according to the primary shard request.
1583+
if (shard.isPrimaryMode() && shard.routingEntry().active()) {
1584+
shard.publishReferencedSegments();
1585+
}
1586+
} catch (IOException ex) {
1587+
logger.warn(() -> new ParameterizedMessage("failed to publish primary referenced segments"), ex);
1588+
}
1589+
}
1590+
}
1591+
15191592
final class AsyncTrimTranslogTask extends BaseAsyncTask {
15201593

15211594
AsyncTrimTranslogTask(IndexService indexService) {

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,15 @@ public static IndexMergePolicy fromString(String text) {
170170
Property.Dynamic,
171171
Property.IndexScope
172172
);
173+
174+
public static final Setting<TimeValue> INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING = Setting.timeSetting(
175+
"index.segment_replication.publish_referenced_segments_interval",
176+
TimeValue.timeValueMinutes(10),
177+
TimeValue.timeValueSeconds(1),
178+
Property.Dynamic,
179+
Property.IndexScope
180+
);
181+
173182
public static final Setting<TimeValue> INDEX_SEARCH_IDLE_AFTER = Setting.timeSetting(
174183
"index.search.idle.after",
175184
TimeValue.timeValueSeconds(30),
@@ -813,6 +822,7 @@ public static IndexMergePolicy fromString(String text) {
813822
private final boolean defaultAllowUnmappedFields;
814823
private volatile Translog.Durability durability;
815824
private volatile TimeValue syncInterval;
825+
private volatile TimeValue publishReferencedSegmentsInterval;
816826
private volatile TimeValue refreshInterval;
817827
private volatile ByteSizeValue flushThresholdSize;
818828
private volatile TimeValue translogRetentionAge;
@@ -1025,6 +1035,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
10251035
this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING);
10261036
defaultFields = scopedSettings.get(DEFAULT_FIELD_SETTING);
10271037
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
1038+
publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings);
10281039
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
10291040
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
10301041
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
@@ -1146,6 +1157,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
11461157
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
11471158
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
11481159
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_SYNC_INTERVAL_SETTING, this::setTranslogSyncInterval);
1160+
scopedSettings.addSettingsUpdateConsumer(
1161+
INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING,
1162+
this::setPublishReferencedSegmentsInterval
1163+
);
11491164
scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow);
11501165
scopedSettings.addSettingsUpdateConsumer(MAX_INNER_RESULT_WINDOW_SETTING, this::setMaxInnerResultWindow);
11511166
scopedSettings.addSettingsUpdateConsumer(MAX_ADJACENCY_MATRIX_FILTERS_SETTING, this::setMaxAdjacencyMatrixFilters);
@@ -1524,6 +1539,14 @@ public void setTranslogSyncInterval(TimeValue translogSyncInterval) {
15241539
this.syncInterval = translogSyncInterval;
15251540
}
15261541

1542+
public TimeValue getPublishReferencedSegmentsInterval() {
1543+
return publishReferencedSegmentsInterval;
1544+
}
1545+
1546+
public void setPublishReferencedSegmentsInterval(TimeValue publishReferencedSegmentsInterval) {
1547+
this.publishReferencedSegmentsInterval = publishReferencedSegmentsInterval;
1548+
}
1549+
15271550
/**
15281551
* Returns the translog sync/upload buffer interval when remote translog store is enabled and index setting
15291552
* {@code index.translog.durability} is set as {@code request}.

0 commit comments

Comments
 (0)