Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add build-tooling to run in FIPS environment ([#18921](https://github.com/opensearch-project/OpenSearch/pull/18921))
- Add SMILE/CBOR/YAML document format support to Bulk GRPC endpoint ([#19744](https://github.com/opensearch-project/OpenSearch/pull/19744))
- Implement GRPC ConstantScoreQuery, FuzzyQuery, MatchBoolPrefixQuery, MatchPhrasePrefix, PrefixQuery, MatchQuery ([#19854](https://github.com/opensearch-project/OpenSearch/pull/19854))
- Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878))

### Changed
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,4 +653,131 @@ private boolean validateOffsetBasedLagForPrimaryAndReplica(long expectedLag) {
&& shardTypeToStats.get("replica").getConsumerStats().pointerBasedLag() == expectedLag;
return valid;
}

public void testAllActiveIngestionBatchStartPointerOnReplicaPromotion() throws Exception {
// Step 1: Publish 10 messages
for (int i = 1; i <= 10; i++) {
produceDataWithExternalVersion(String.valueOf(i), 1, "name" + i, "25", defaultMessageTimestamp, "index");
}

// Step 2: Start nodes
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();

// Step 3: Create all-active index
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.all_active", true)
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

ensureGreen(indexName);

// Step 4: Wait for 10 messages to be searchable on nodeA
waitForSearchableDocs(10, Arrays.asList(nodeA));

// Step 5: Flush to persist data
flush(indexName);

// Step 6: Add second node
final String nodeB = internalCluster().startDataOnlyNode();

// Step 7: Relocate shard from nodeA to nodeB
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(indexName, 0, nodeA, nodeB)).get();
ensureGreen(indexName);
assertTrue(nodeB.equals(primaryNodeName(indexName)));

// Step 8: Publish 1 new message
produceDataWithExternalVersion("11", 1, "name11", "25", defaultMessageTimestamp, "index");

// Step 9: Wait for 11 messages to be visible on nodeB
waitForSearchableDocs(11, Arrays.asList(nodeB));

// Step 10: Flush to persist data
flush(indexName);

// Step 11: Validate processed messages and version conflict count on nodeB
PollingIngestStats nodeBStats = client(nodeB).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertNotNull(nodeBStats);
assertEquals(2L, nodeBStats.getMessageProcessorStats().totalProcessedCount());
assertEquals(1L, nodeBStats.getMessageProcessorStats().totalVersionConflictsCount());

// Step 12: Add third node
final String nodeC = internalCluster().startDataOnlyNode();

// Step 13: Bring down nodeA so the new replica will be allocated to nodeC
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));

// Step 14: Add a replica (will be allocated to nodeC since only nodeB and nodeC are available)
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
.get();
ensureGreen(indexName);

// Step 15: Wait for 11 messages to be searchable on nodeC (replica)
waitForSearchableDocs(11, Arrays.asList(nodeC));

// Step 16: Bring down nodeB (primary) and wait for nodeC to become primary
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeB));
ensureYellowAndNoInitializingShards(indexName);
assertTrue(nodeC.equals(primaryNodeName(indexName)));

// Step 17: Publish 1 more message
produceDataWithExternalVersion("12", 1, "name12", "25", defaultMessageTimestamp, "index");

// Step 18: Wait for 12 messages to be visible on nodeC
waitForSearchableDocs(12, Arrays.asList(nodeC));

// Step 19: Validate processed messages and version conflict count on nodeC
PollingIngestStats nodeCStats = client(nodeC).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertNotNull(nodeCStats);

assertEquals(2L, nodeCStats.getMessageProcessorStats().totalProcessedCount());
assertEquals(1L, nodeCStats.getMessageProcessorStats().totalVersionConflictsCount());
}

public void testAllActiveIngestionPeriodicFlush() throws Exception {
// Publish 10 messages
for (int i = 1; i <= 10; i++) {
produceData(String.valueOf(i), "name" + i, "25");
}

// Start nodes
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();

// Create all-active index with 5 second periodic flush interval
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.all_active", true)
.put("index.periodic_flush_interval", "5s")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

ensureGreen(indexName);

waitForSearchableDocs(10, Arrays.asList(nodeA));
waitForState(() -> getPeriodicFlushCount(nodeA, indexName) >= 1);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,17 @@ protected void setWriteBlock(String indexName, boolean isWriteBlockEnabled) {
.setSettings(Settings.builder().put("index.blocks.write", isWriteBlockEnabled))
.get();
}

/**
* Gets the periodic flush count for the specified index from the specified node.
*
* @param nodeName the name of the node to query
* @param indexName the name of the index
* @return the periodic flush count
*/
protected long getPeriodicFlushCount(String nodeName, String indexName) {
return client(nodeName).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0].getStats()
.getFlush()
.getPeriodic();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -813,4 +813,100 @@ private void verifyRemoteStoreEnabled(String node) {
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");
assertEquals("Remote store should be enabled", "true", remoteStoreEnabled);
}

public void testBatchStartPointerOnReplicaPromotion() throws Exception {
// Step 1: Publish 10 messages
for (int i = 1; i <= 10; i++) {
produceDataWithExternalVersion(String.valueOf(i), 1, "name" + i, "25", defaultMessageTimestamp, "index");
}

// Step 2: Start nodes
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();

// Step 3: Create index with 1 replica
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

ensureYellowAndNoInitializingShards(indexName);

// Step 4: Add second node and verify green status
final String nodeB = internalCluster().startDataOnlyNode();
ensureGreen(indexName);

// Step 5: Verify nodeA has the primary shard
assertTrue(nodeA.equals(primaryNodeName(indexName)));
assertTrue(nodeB.equals(replicaNodeName(indexName)));
verifyRemoteStoreEnabled(nodeA);
verifyRemoteStoreEnabled(nodeB);

// Step 6: Wait for 10 messages to be searchable on both nodes
waitForSearchableDocs(10, Arrays.asList(nodeA, nodeB));

// Step 7: Flush to persist data
flush(indexName);

// Step 8: Bring down nodeA (primary) and wait for nodeB to become primary
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
ensureYellowAndNoInitializingShards(indexName);
assertTrue(nodeB.equals(primaryNodeName(indexName)));

// Step 9: Publish 1 new message
produceDataWithExternalVersion("11", 1, "name11", "25", defaultMessageTimestamp, "index");

// Step 10: Wait for 11 messages to be visible on nodeB
waitForSearchableDocs(11, Arrays.asList(nodeB));

// Step 11: Validate version conflict count is exactly 1
PollingIngestStats finalStats = client(nodeB).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertNotNull(finalStats);

assertEquals(1L, finalStats.getMessageProcessorStats().totalVersionConflictsCount());
assertEquals(2L, finalStats.getMessageProcessorStats().totalProcessedCount());
}

public void testPeriodicFlush() throws Exception {
// Publish 10 messages
for (int i = 1; i <= 10; i++) {
produceData(String.valueOf(i), "name" + i, "25");
}

// Start nodes
internalCluster().startClusterManagerOnlyNode();
final String nodeA = internalCluster().startDataOnlyNode();

// Create index with 5 second periodic flush interval
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "earliest")
.put("ingestion_source.param.topic", topicName)
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("index.replication.type", "SEGMENT")
.put("index.periodic_flush_interval", "5s")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

ensureGreen(indexName);
verifyRemoteStoreEnabled(nodeA);

waitForSearchableDocs(10, Arrays.asList(nodeA));
waitForState(() -> getPeriodicFlushCount(nodeA, indexName) >= 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING,
IndexSettings.INDEX_WARMER_ENABLED_SETTING,
IndexSettings.INDEX_REFRESH_INTERVAL_SETTING,
IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING,
IndexSettings.MAX_RESULT_WINDOW_SETTING,
IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING,
IndexSettings.MAX_TOKEN_COUNT_SETTING,
Expand Down
36 changes: 36 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,28 @@ public static IndexMergePolicy fromString(String text) {
Property.Dynamic,
Property.IndexScope
);

/**
* Periodic flush interval setting. By default, periodic flush is disabled (-1).
* For pull-based ingestion indices, this defaults to 10 minutes to ensure offsets are regularly committed.
*/
public static final TimeValue DEFAULT_PERIODIC_FLUSH_INTERVAL = TimeValue.MINUS_ONE;
public static final TimeValue MINIMUM_PERIODIC_FLUSH_INTERVAL = TimeValue.MINUS_ONE;
public static final Setting<TimeValue> INDEX_PERIODIC_FLUSH_INTERVAL_SETTING = Setting.timeSetting(
"index.periodic_flush_interval",
(settings) -> {
// Default to 10 minutes for pull-based ingestion indices, disabled otherwise
String ingestionSourceType = IndexMetadata.INGESTION_SOURCE_TYPE_SETTING.get(settings);
if (ingestionSourceType != null && !IndexMetadata.NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType)) {
return TimeValue.timeValueMinutes(10);
}
return DEFAULT_PERIODIC_FLUSH_INTERVAL;
},
MINIMUM_PERIODIC_FLUSH_INTERVAL,
Property.Dynamic,
Property.IndexScope
);

public static final Setting<ByteSizeValue> INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING = Setting.byteSizeSetting(
"index.translog.flush_threshold_size",
new ByteSizeValue(512, ByteSizeUnit.MB),
Expand Down Expand Up @@ -839,6 +861,7 @@ public static IndexMergePolicy fromString(String text) {
private volatile TimeValue syncInterval;
private volatile TimeValue publishReferencedSegmentsInterval;
private volatile TimeValue refreshInterval;
private volatile TimeValue periodicFlushInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue translogRetentionAge;
private volatile ByteSizeValue translogRetentionSize;
Expand Down Expand Up @@ -1057,6 +1080,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings);
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
periodicFlushInterval = scopedSettings.get(INDEX_PERIODIC_FLUSH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
flushAfterMergeThresholdSize = scopedSettings.get(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING);
Expand Down Expand Up @@ -1205,6 +1229,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(INDEX_PERIODIC_FLUSH_INTERVAL_SETTING, this::setPeriodicFlushInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset);
scopedSettings.addSettingsUpdateConsumer(MAX_TERMS_COUNT_SETTING, this::setMaxTermsCount);
Expand Down Expand Up @@ -1302,6 +1327,10 @@ private void setRefreshInterval(TimeValue timeValue) {
this.refreshInterval = timeValue;
}

private void setPeriodicFlushInterval(TimeValue timeValue) {
this.periodicFlushInterval = timeValue;
}

/**
* Update the default maxMergesAtOnce
* 1. sets the new default in {@code TieredMergePolicyProvider}
Expand Down Expand Up @@ -1644,6 +1673,13 @@ public TimeValue getRefreshInterval() {
return refreshInterval;
}

/**
* Returns the interval at which a periodic flush should be executed. {@code -1} means periodic flush is disabled.
*/
public TimeValue getPeriodicFlushInterval() {
return periodicFlushInterval;
}

/**
* Returns the transaction log threshold size when to forcefully flush the index and clear the transaction log.
*/
Expand Down
Loading
Loading