diff --git a/CHANGELOG.md b/CHANGELOG.md index cdea7c564ade0..441bb9f57a5aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add build-tooling to run in FIPS environment ([#18921](https://github.com/opensearch-project/OpenSearch/pull/18921)) - Add SMILE/CBOR/YAML document format support to Bulk GRPC endpoint ([#19744](https://github.com/opensearch-project/OpenSearch/pull/19744)) - Implement GRPC ConstantScoreQuery, FuzzyQuery, MatchBoolPrefixQuery, MatchPhrasePrefix, PrefixQuery, MatchQuery ([#19854](https://github.com/opensearch-project/OpenSearch/pull/19854)) +- Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878)) ### Changed - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index 7138dcaba8c15..e8c9193c0d2fd 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -653,4 +653,131 @@ private boolean validateOffsetBasedLagForPrimaryAndReplica(long expectedLag) { && shardTypeToStats.get("replica").getConsumerStats().pointerBasedLag() == expectedLag; return valid; } + + public void testAllActiveIngestionBatchStartPointerOnReplicaPromotion() throws Exception { + // Step 1: Publish 10 messages + for (int i = 1; i <= 10; i++) { + produceDataWithExternalVersion(String.valueOf(i), 1, "name" + i, "25", defaultMessageTimestamp, "index"); + } + + // Step 2: Start nodes + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + + // Step 3: Create all-active index + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("ingestion_source.all_active", true) + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + ensureGreen(indexName); + + // Step 4: Wait for 10 messages to be searchable on nodeA + waitForSearchableDocs(10, Arrays.asList(nodeA)); + + // Step 5: Flush to persist data + flush(indexName); + + // Step 6: Add second node + final String nodeB = internalCluster().startDataOnlyNode(); + + // Step 7: Relocate shard from nodeA to nodeB + client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(indexName, 0, nodeA, nodeB)).get(); + ensureGreen(indexName); + assertTrue(nodeB.equals(primaryNodeName(indexName))); + + // Step 8: Publish 1 new message + produceDataWithExternalVersion("11", 1, "name11", "25", defaultMessageTimestamp, "index"); + + // Step 9: Wait for 11 messages to be visible on nodeB + waitForSearchableDocs(11, Arrays.asList(nodeB)); + + // Step 10: Flush to persist data + flush(indexName); + + // Step 11: Validate processed messages and version conflict count on nodeB + PollingIngestStats nodeBStats = client(nodeB).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + assertNotNull(nodeBStats); + assertEquals(2L, nodeBStats.getMessageProcessorStats().totalProcessedCount()); + assertEquals(1L, nodeBStats.getMessageProcessorStats().totalVersionConflictsCount()); + + // Step 12: Add third node + final String nodeC = internalCluster().startDataOnlyNode(); + + // Step 13: Bring down nodeA so the new replica will be allocated to nodeC + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA)); + + // Step 14: Add a replica (will be allocated to nodeC since only nodeB and nodeC are available) + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + .get(); + ensureGreen(indexName); + + // Step 15: Wait for 11 messages to be searchable on nodeC (replica) + waitForSearchableDocs(11, Arrays.asList(nodeC)); + + // Step 16: Bring down nodeB (primary) and wait for nodeC to become primary + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeB)); + ensureYellowAndNoInitializingShards(indexName); + assertTrue(nodeC.equals(primaryNodeName(indexName))); + + // Step 17: Publish 1 more message + produceDataWithExternalVersion("12", 1, "name12", "25", defaultMessageTimestamp, "index"); + + // Step 18: Wait for 12 messages to be visible on nodeC + waitForSearchableDocs(12, Arrays.asList(nodeC)); + + // Step 19: Validate processed messages and version conflict count on nodeC + PollingIngestStats nodeCStats = client(nodeC).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + assertNotNull(nodeCStats); + + assertEquals(2L, nodeCStats.getMessageProcessorStats().totalProcessedCount()); + assertEquals(1L, nodeCStats.getMessageProcessorStats().totalVersionConflictsCount()); + } + + public void testAllActiveIngestionPeriodicFlush() throws Exception { + // Publish 10 messages + for (int i = 1; i <= 10; i++) { + produceData(String.valueOf(i), "name" + i, "25"); + } + + // Start nodes + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + + // Create all-active index with 5 second periodic flush interval + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("ingestion_source.all_active", true) + .put("index.periodic_flush_interval", "5s") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + ensureGreen(indexName); + + waitForSearchableDocs(10, Arrays.asList(nodeA)); + waitForState(() -> getPeriodicFlushCount(nodeA, indexName) >= 1); + + } } diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java index 8c9110a86db70..888156ffd1272 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java @@ -222,4 +222,17 @@ protected void setWriteBlock(String indexName, boolean isWriteBlockEnabled) { .setSettings(Settings.builder().put("index.blocks.write", isWriteBlockEnabled)) .get(); } + + /** + * Gets the periodic flush count for the specified index from the specified node. + * + * @param nodeName the name of the node to query + * @param indexName the name of the index + * @return the periodic flush count + */ + protected long getPeriodicFlushCount(String nodeName, String indexName) { + return client(nodeName).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0].getStats() + .getFlush() + .getPeriodic(); + } } diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java index 78a20f24274f1..93e8d073834f7 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java @@ -813,4 +813,100 @@ private void verifyRemoteStoreEnabled(String node) { String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled"); assertEquals("Remote store should be enabled", "true", remoteStoreEnabled); } + + public void testBatchStartPointerOnReplicaPromotion() throws Exception { + // Step 1: Publish 10 messages + for (int i = 1; i <= 10; i++) { + produceDataWithExternalVersion(String.valueOf(i), 1, "name" + i, "25", defaultMessageTimestamp, "index"); + } + + // Step 2: Start nodes + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + + // Step 3: Create index with 1 replica + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + ensureYellowAndNoInitializingShards(indexName); + + // Step 4: Add second node and verify green status + final String nodeB = internalCluster().startDataOnlyNode(); + ensureGreen(indexName); + + // Step 5: Verify nodeA has the primary shard + assertTrue(nodeA.equals(primaryNodeName(indexName))); + assertTrue(nodeB.equals(replicaNodeName(indexName))); + verifyRemoteStoreEnabled(nodeA); + verifyRemoteStoreEnabled(nodeB); + + // Step 6: Wait for 10 messages to be searchable on both nodes + waitForSearchableDocs(10, Arrays.asList(nodeA, nodeB)); + + // Step 7: Flush to persist data + flush(indexName); + + // Step 8: Bring down nodeA (primary) and wait for nodeB to become primary + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA)); + ensureYellowAndNoInitializingShards(indexName); + assertTrue(nodeB.equals(primaryNodeName(indexName))); + + // Step 9: Publish 1 new message + produceDataWithExternalVersion("11", 1, "name11", "25", defaultMessageTimestamp, "index"); + + // Step 10: Wait for 11 messages to be visible on nodeB + waitForSearchableDocs(11, Arrays.asList(nodeB)); + + // Step 11: Validate version conflict count is exactly 1 + PollingIngestStats finalStats = client(nodeB).admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + assertNotNull(finalStats); + + assertEquals(1L, finalStats.getMessageProcessorStats().totalVersionConflictsCount()); + assertEquals(2L, finalStats.getMessageProcessorStats().totalProcessedCount()); + } + + public void testPeriodicFlush() throws Exception { + // Publish 10 messages + for (int i = 1; i <= 10; i++) { + produceData(String.valueOf(i), "name" + i, "25"); + } + + // Start nodes + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + + // Create index with 5 second periodic flush interval + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .put("index.periodic_flush_interval", "5s") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + ensureGreen(indexName); + verifyRemoteStoreEnabled(nodeA); + + waitForSearchableDocs(10, Arrays.asList(nodeA)); + waitForState(() -> getPeriodicFlushCount(nodeA, indexName) >= 1); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 775c114c61389..9f4efcdbc25e0 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -147,6 +147,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING, IndexSettings.INDEX_WARMER_ENABLED_SETTING, IndexSettings.INDEX_REFRESH_INTERVAL_SETTING, + IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING, IndexSettings.MAX_RESULT_WINDOW_SETTING, IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING, IndexSettings.MAX_TOKEN_COUNT_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 349875f2a2a85..4f6198b42ec03 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -380,6 +380,28 @@ public static IndexMergePolicy fromString(String text) { Property.Dynamic, Property.IndexScope ); + + /** + * Periodic flush interval setting. By default, periodic flush is disabled (-1). + * For pull-based ingestion indices, this defaults to 10 minutes to ensure offsets are regularly committed. + */ + public static final TimeValue DEFAULT_PERIODIC_FLUSH_INTERVAL = TimeValue.MINUS_ONE; + public static final TimeValue MINIMUM_PERIODIC_FLUSH_INTERVAL = TimeValue.MINUS_ONE; + public static final Setting INDEX_PERIODIC_FLUSH_INTERVAL_SETTING = Setting.timeSetting( + "index.periodic_flush_interval", + (settings) -> { + // Default to 10 minutes for pull-based ingestion indices, disabled otherwise + String ingestionSourceType = IndexMetadata.INGESTION_SOURCE_TYPE_SETTING.get(settings); + if (ingestionSourceType != null && !IndexMetadata.NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType)) { + return TimeValue.timeValueMinutes(10); + } + return DEFAULT_PERIODIC_FLUSH_INTERVAL; + }, + MINIMUM_PERIODIC_FLUSH_INTERVAL, + Property.Dynamic, + Property.IndexScope + ); + public static final Setting INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING = Setting.byteSizeSetting( "index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), @@ -839,6 +861,7 @@ public static IndexMergePolicy fromString(String text) { private volatile TimeValue syncInterval; private volatile TimeValue publishReferencedSegmentsInterval; private volatile TimeValue refreshInterval; + private volatile TimeValue periodicFlushInterval; private volatile ByteSizeValue flushThresholdSize; private volatile TimeValue translogRetentionAge; private volatile ByteSizeValue translogRetentionSize; @@ -1057,6 +1080,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); publishReferencedSegmentsInterval = INDEX_PUBLISH_REFERENCED_SEGMENTS_INTERVAL_SETTING.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); + periodicFlushInterval = scopedSettings.get(INDEX_PERIODIC_FLUSH_INTERVAL_SETTING); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING); generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); flushAfterMergeThresholdSize = scopedSettings.get(INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING); @@ -1205,6 +1229,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); + scopedSettings.addSettingsUpdateConsumer(INDEX_PERIODIC_FLUSH_INTERVAL_SETTING, this::setPeriodicFlushInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset); scopedSettings.addSettingsUpdateConsumer(MAX_TERMS_COUNT_SETTING, this::setMaxTermsCount); @@ -1302,6 +1327,10 @@ private void setRefreshInterval(TimeValue timeValue) { this.refreshInterval = timeValue; } + private void setPeriodicFlushInterval(TimeValue timeValue) { + this.periodicFlushInterval = timeValue; + } + /** * Update the default maxMergesAtOnce * 1. sets the new default in {@code TieredMergePolicyProvider} @@ -1644,6 +1673,13 @@ public TimeValue getRefreshInterval() { return refreshInterval; } + /** + * Returns the interval at which a periodic flush should be executed. {@code -1} means periodic flush is disabled. + */ + public TimeValue getPeriodicFlushInterval() { + return periodicFlushInterval; + } + /** * Returns the transaction log threshold size when to forcefully flush the index and clear the transaction log. */ diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index e6e4729915c87..6393a91a8d671 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -61,6 +61,7 @@ public class IngestionEngine extends InternalEngine { private StreamPoller streamPoller; private final IngestionConsumerFactory ingestionConsumerFactory; private final DocumentMapperForType documentMapperForType; + private volatile IngestionShardPointer lastCommittedBatchStartPointer; public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) { super(engineConfig); @@ -345,6 +346,7 @@ public Translog.Snapshot newChangesSnapshot( protected void commitIndexWriter(final IndexWriter writer, final String translogUUID) throws IOException { try { final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); + final IngestionShardPointer batchStartPointer = streamPoller.getBatchStartPointer(); writer.setLiveCommitData(() -> { /* * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes @@ -368,8 +370,8 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog * Batch start pointer can be null at index creation time, if flush is called before the stream * poller has been completely initialized. */ - if (streamPoller.getBatchStartPointer() != null) { - commitData.put(StreamPoller.BATCH_START, streamPoller.getBatchStartPointer().asString()); + if (batchStartPointer != null) { + commitData.put(StreamPoller.BATCH_START, batchStartPointer.asString()); } else { logger.warn("ignore null batch start pointer"); } @@ -382,6 +384,7 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog }); shouldPeriodicallyFlushAfterBigMerge.set(false); writer.commit(); + lastCommittedBatchStartPointer = batchStartPointer; } catch (final Exception ex) { try { failEngine("lucene commit failed", ex); @@ -408,6 +411,34 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog } } + /** + * Periodic flush is required if the batchStartPointer has changed since the last commit or there is a big merge. + */ + @Override + public boolean shouldPeriodicallyFlush() { + ensureOpen(); + + // Check if flush needed after big merge + if (shouldPeriodicallyFlushAfterBigMerge.get()) { + return true; + } + + // Check if batchStartPointer has changed since last commit + IngestionShardPointer currentBatchStartPointer = streamPoller.getBatchStartPointer(); + + // If current pointer is null, no flush needed + if (currentBatchStartPointer == null) { + return false; + } + + // If this is the first commit or pointer has changed, flush is needed + if (lastCommittedBatchStartPointer == null) { + return true; + } + + return currentBatchStartPointer.equals(lastCommittedBatchStartPointer) == false; + } + @Override public void activateThrottling() { // TODO: add this when we have a thread pool for indexing in parallel @@ -418,11 +449,6 @@ public void deactivateThrottling() { // TODO: is this needed? } - @Override - public void maybePruneDeletes() { - // no need to prune deletes in ingestion engine - } - @Override public void close() throws IOException { if (streamPoller != null) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 2c2cc01587131..dbed65eecce47 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -387,6 +387,7 @@ Runnable getGlobalCheckpointSyncer() { private final Supplier refreshInterval; private final Object refreshMutex; private volatile AsyncShardRefreshTask refreshTask; + private volatile AsyncShardFlushTask periodicFlushTask; private final ClusterApplierService clusterApplierService; private final MergedSegmentPublisher mergedSegmentPublisher; private final ReferencedSegmentsPublisher referencedSegmentsPublisher; @@ -2332,7 +2333,14 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO } finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners - IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions, refreshTask); + IOUtils.close( + engine, + globalCheckpointListeners, + refreshListeners, + pendingReplicationActions, + refreshTask, + periodicFlushTask + ); if (deleted && engine != null && isPrimaryMode()) { // Translog Clean up @@ -2932,6 +2940,9 @@ private Map fetchUserData() throws IOException { private void onNewEngine(Engine newEngine) { assert Thread.holdsLock(engineMutex); refreshListeners.setCurrentRefreshLocationSupplier(newEngine.translogManager()::getTranslogLastWriteLocation); + + // Start periodic flush task for this shard + startPeriodicFlushTask(); } /** @@ -5793,4 +5804,66 @@ public AsyncShardRefreshTask getRefreshTask() { return refreshTask; } + public void startPeriodicFlushTask() { + TimeValue interval = indexSettings.getPeriodicFlushInterval(); + // Only start the async flush task if interval is >0 and task is not already running + if (interval.millis() > 0 && periodicFlushTask == null) { + periodicFlushTask = new AsyncShardFlushTask(this, interval); + logger.info("Started periodic flush task for shard [{}] with interval [{}]", shardId, interval); + } + } + + // Visible for testing + AsyncShardFlushTask getPeriodicFlushTask() { + return periodicFlushTask; + } + + /** + * Async shard flush task to call flush at a regular interval. + * This is particularly useful for pull-based ingestion index without translog to ensure offsets are regularly committed. + */ + final class AsyncShardFlushTask extends AbstractAsyncTask { + + private final IndexShard indexShard; + private final Logger logger; + + public AsyncShardFlushTask(IndexShard indexShard, TimeValue interval) { + super(indexShard.logger, indexShard.threadPool, interval, true); + this.logger = indexShard.logger; + this.indexShard = indexShard; + rescheduleIfNecessary(); + } + + @Override + protected boolean mustReschedule() { + // Only schedule for open shards with a valid interval + return indexShard.state != IndexShardState.CLOSED + && indexShard.indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN + && getInterval().millis() > 0; + } + + @Override + protected void runInternal() { + // Only execute if no other flush/roll is running to prevent concurrent flushes + if (indexShard.flushOrRollRunning.compareAndSet(false, true)) { + try { + indexShard.flush(new FlushRequest()); + indexShard.periodicFlushMetric.inc(); + } finally { + indexShard.flushOrRollRunning.set(false); + } + } + } + + @Override + protected String getThreadPool() { + return ThreadPool.Names.FLUSH; + } + + @Override + public String toString() { + return "shard_periodic_flush"; + } + } + } diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 9a121200bf6ce..91baba197a3e5 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -1097,4 +1097,77 @@ public void testDerivedSourceTranslogReadPreferenceValidation() { settings = new IndexSettings(metadata, Settings.EMPTY); assertFalse(settings.isDerivedSourceEnabledForTranslog()); } + + public void testDefaultPeriodicFlushIntervalForRegularIndex() { + // Test that regular indices have periodic flush disabled by default + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "test-uuid") + .build(); + + TimeValue defaultValue = IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING.get(indexSettings); + assertEquals(TimeValue.MINUS_ONE, defaultValue); + } + + public void testDefaultPeriodicFlushIntervalForPullBasedIngestionIndex() { + // Test that ingestion indices have periodic flush enabled by default + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "test-uuid") + .put(IndexMetadata.SETTING_INGESTION_SOURCE_TYPE, "kafka") + .build(); + + TimeValue defaultValue = IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING.get(indexSettings); + assertEquals(TimeValue.timeValueMinutes(10), defaultValue); + } + + public void testPeriodicFlushIntervalExplicitValue() { + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "test-uuid") + .put(IndexMetadata.SETTING_INGESTION_SOURCE_TYPE, "kafka") + .put(IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING.getKey(), "5m") + .build(); + + TimeValue value = IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING.get(indexSettings); + assertEquals(TimeValue.timeValueMinutes(5), value); + } + + public void testPeriodicFlushIntervalDynamicUpdate() { + IndexMetadata metadata = newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INGESTION_SOURCE_TYPE, "kafka") + .build() + ); + IndexSettings settings = newIndexSettings(metadata, Settings.EMPTY); + + // Verify default value + assertEquals(TimeValue.timeValueMinutes(10), settings.getPeriodicFlushInterval()); + + // Update to 10 minutes + settings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_INGESTION_SOURCE_TYPE, "kafka") + .put(IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING.getKey(), "10m") + .build() + ) + ); + assertEquals(TimeValue.timeValueMinutes(10), settings.getPeriodicFlushInterval()); + + // Update to disabled (-1) + settings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexMetadata.SETTING_INGESTION_SOURCE_TYPE, "kafka") + .put(IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING.getKey(), "-1") + .build() + ) + ); + assertEquals(TimeValue.MINUS_ONE, settings.getPeriodicFlushInterval()); + } } diff --git a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java index adefff71c5274..c9a7e52382bc6 100644 --- a/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java @@ -198,6 +198,34 @@ public void testIngestionStateUpdate() { assertEquals(6, stats.getConsumerStats().totalPolledCount()); } + public void testShouldPeriodicallyFlush() throws IOException { + // Wait for messages to be ingested first so batchStartPointer is set + waitForResults(ingestionEngine, 2); + + // Should flush because lastCommittedBatchStartPointer is null (no commit yet) + assertTrue(ingestionEngine.shouldPeriodicallyFlush()); + + // After first flush, lastCommittedBatchStartPointer is set + ingestionEngine.flush(false, true); + + // Should not flush immediately after commit since pointer hasn't changed + assertFalse(ingestionEngine.shouldPeriodicallyFlush()); + + // Publish new messages, which will advance the batch start pointer + publishData("{\"_id\":\"3\",\"_source\":{\"name\":\"john\", \"age\": 30}}"); + publishData("{\"_id\":\"4\",\"_source\":{\"name\":\"jane\", \"age\": 25}}"); + waitForResults(ingestionEngine, 4); + + // Should flush because batchStartPointer has changed since last commit + assertTrue(ingestionEngine.shouldPeriodicallyFlush()); + + // Flush again + ingestionEngine.flush(false, true); + + // Should not flush immediately after commit + assertFalse(ingestionEngine.shouldPeriodicallyFlush()); + } + private IngestionEngine buildIngestionEngine( AtomicLong globalCheckpoint, Store store, diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 173e17b884536..3ab964d2baf99 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -5237,4 +5237,89 @@ private static void assertRemoteSegmentStats( assertTrue(remoteSegmentStats.getTotalRejections() > 0); assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections()); } + + public void testPeriodicFlushTaskNotStartedForRegularIndex() throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + + IndexMetadata metadata = IndexMetadata.builder("test") + .putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1) + .build(); + + IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); + recoverShardFromStore(primary); + + // Periodic flush task should not be started + assertNull(primary.getPeriodicFlushTask()); + closeShards(primary); + } + + public void testPeriodicFlushTaskStartedForIngestionIndex() throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_INGESTION_SOURCE_TYPE, "kafka") + .put(IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING.getKey(), "1m") + .build(); + + IndexMetadata metadata = IndexMetadata.builder("test") + .putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1) + .build(); + + IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); + recoverShardFromStore(primary); + + // Periodic flush task should be started + assertNotNull(primary.getPeriodicFlushTask()); + assertEquals(TimeValue.timeValueMinutes(1), primary.getPeriodicFlushTask().getInterval()); + assertFalse(primary.getPeriodicFlushTask().isClosed()); + + closeShards(primary); + + // Task should be closed after shard close + assertTrue(primary.getPeriodicFlushTask().isClosed()); + } + + public void testPeriodicFlushTaskExecutesFlush() throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_INGESTION_SOURCE_TYPE, "kafka") + .put(IndexSettings.INDEX_PERIODIC_FLUSH_INTERVAL_SETTING.getKey(), "1s") + .build(); + + IndexMetadata metadata = IndexMetadata.builder("test") + .putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}") + .settings(settings) + .primaryTerm(0, 1) + .build(); + + IndexShard primary = newShard(new ShardId(metadata.getIndex(), 0), true, "n1", metadata, null); + recoverShardFromStore(primary); + indexDoc(primary, "_doc", "0", "{\"foo\" : \"bar\"}"); + + // Get initial flush count + long initialPeriodicFlushCount = primary.flushStats().getPeriodic(); + + // Execute the periodic flush task + IndexShard.AsyncShardFlushTask flushTask = primary.getPeriodicFlushTask(); + assertNotNull(flushTask); + + // Execute the flush task + flushTask.run(); + + // Verify periodic flush count increased + assertEquals(initialPeriodicFlushCount + 1, primary.flushStats().getPeriodic()); + + closeShards(primary); + } }