diff --git a/CHANGELOG.md b/CHANGELOG.md index 4258ee02c0b5d..534799b9a2efd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add a mapper for context aware segments grouping criteria ([#19233](https://github.com/opensearch-project/OpenSearch/pull/19233)) - Return full error for GRPC error response ([#19568](https://github.com/opensearch-project/OpenSearch/pull/19568)) - Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005)) - - Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929)) +- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635)) ### Changed - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) diff --git a/plugins/ingestion-fs/src/main/java/org/opensearch/plugin/ingestion/fs/FilePartitionConsumer.java b/plugins/ingestion-fs/src/main/java/org/opensearch/plugin/ingestion/fs/FilePartitionConsumer.java index 73b750dcc1da7..6fe3c8799113c 100644 --- a/plugins/ingestion-fs/src/main/java/org/opensearch/plugin/ingestion/fs/FilePartitionConsumer.java +++ b/plugins/ingestion-fs/src/main/java/org/opensearch/plugin/ingestion/fs/FilePartitionConsumer.java @@ -62,7 +62,6 @@ public FilePartitionConsumer(FileSourceConfig config, int shardId) { public List> readNext(FileOffset offset, boolean includeStart, long maxMessages, int timeoutMillis) throws TimeoutException { long startLine = includeStart ? offset.getLine() : offset.getLine() + 1; - lastReadLine = startLine; return readFromFile(startLine, maxMessages); } @@ -155,6 +154,22 @@ public int getShardId() { return shardId; } + @Override + public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) { + if (!shardFile.exists()) { + return 0; + } + + FileOffset latestOffset = (FileOffset) latestPointer(); + if (lastReadLine < 0) { + // Haven't read anything yet, use the expected start pointer + long startLine = ((FileOffset) expectedStartPointer).getLine(); + return Math.max(0, latestOffset.getLine() - startLine); + } + // return lag as number of remaining lines from lastReadLineNumber + return latestOffset.getLine() - lastReadLine - 1; + } + @Override public void close() throws IOException { if (reader != null) { diff --git a/plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java b/plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java index 30e28d4d11f8c..555b0d0152e21 100644 --- a/plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java +++ b/plugins/ingestion-fs/src/test/java/org/opensearch/plugin/ingestion/fs/FileBasedIngestionSingleNodeTests.java @@ -9,6 +9,8 @@ package org.opensearch.plugin.ingestion.fs; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.stats.IndexStats; +import org.opensearch.action.admin.indices.stats.ShardStats; import org.opensearch.action.admin.indices.streamingingestion.pause.PauseIngestionResponse; import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionRequest; import org.opensearch.action.admin.indices.streamingingestion.resume.ResumeIngestionResponse; @@ -17,6 +19,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.indices.pollingingest.PollingIngestStats; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.transport.client.Requests; @@ -31,6 +34,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; public class FileBasedIngestionSingleNodeTests extends OpenSearchSingleNodeTestCase { private Path ingestionDir; @@ -237,4 +242,167 @@ public void testFileIngestionFromProvidedPointer() throws Exception { // cleanup the test index client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet(); } + + public void testPointerBasedLag() throws Exception { + String mappings = """ + { + "properties": { + "name": { "type": "text" }, + "age": { "type": "integer" } + } + } + """; + + // Create index with empty file (no messages) + Path streamDir = ingestionDir.resolve(stream); + Path shardFile = streamDir.resolve("0.ndjson"); + Files.write(shardFile, new byte[0]); // Empty file + + createIndexWithMappingSource( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "FILE") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.pointer_based_lag_update_interval", "3s") + .put("ingestion_source.param.stream", stream) + .put("ingestion_source.param.base_directory", ingestionDir.toString()) + .put("index.replication.type", "SEGMENT") + .build(), + mappings + ); + ensureGreen(index); + + // Lag should be 0 since there are no messages + waitForState(() -> { + PollingIngestStats stats = getPollingIngestStats(index); + return stats != null && stats.getConsumerStats().pointerBasedLag() == 0L; + }); + + // Add messages to the file + try ( + BufferedWriter writer = Files.newBufferedWriter( + shardFile, + StandardCharsets.UTF_8, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING + ) + ) { + writer.write("{\"_id\":\"1\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 30}}\n"); + writer.write("{\"_id\":\"2\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"bob\", \"age\": 35}}\n"); + writer.flush(); + } + + try (FileChannel channel = FileChannel.open(shardFile, StandardOpenOption.READ)) { + channel.force(true); + } + + // Wait for messages to be processed + waitForState(() -> { + SearchResponse response = client().prepareSearch(index).setQuery(new RangeQueryBuilder("age").gte(0)).get(); + return response.getHits().getTotalHits().value() == 2; + }); + + // Lag should be 0 after all messages are consumed + waitForState(() -> { + PollingIngestStats stats = getPollingIngestStats(index); + return stats != null && stats.getConsumerStats().pointerBasedLag() == 0L; + }); + + // cleanup + client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet(); + } + + public void testPointerBasedLagAfterPause() throws Exception { + String mappings = """ + { + "properties": { + "name": { "type": "text" }, + "age": { "type": "integer" } + } + } + """; + + createIndexWithMappingSource( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "FILE") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.pointer_based_lag_update_interval", "3s") + .put("ingestion_source.param.stream", stream) + .put("ingestion_source.param.base_directory", ingestionDir.toString()) + .put("index.replication.type", "SEGMENT") + .build(), + mappings + ); + ensureGreen(index); + + // Wait for initial messages to be processed + waitForState(() -> { + SearchResponse response = client().prepareSearch(index).setQuery(new RangeQueryBuilder("age").gte(0)).get(); + return response.getHits().getTotalHits().value() == 2; + }); + + // Pause ingestion + PauseIngestionResponse pauseResponse = client().admin().indices().pauseIngestion(Requests.pauseIngestionRequest(index)).get(); + assertTrue(pauseResponse.isAcknowledged()); + assertTrue(pauseResponse.isShardsAcknowledged()); + + // Wait for pause to take effect + waitForState(() -> { + GetIngestionStateResponse ingestionState = client().admin() + .indices() + .getIngestionState(Requests.getIngestionStateRequest(index)) + .get(); + return ingestionState.getFailedShards() == 0 + && Arrays.stream(ingestionState.getShardStates()) + .allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused")); + }); + + // Add more messages to the file while paused + Path streamDir = ingestionDir.resolve(stream); + Path shardFile = streamDir.resolve("0.ndjson"); + try (BufferedWriter writer = Files.newBufferedWriter(shardFile, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) { + writer.write("{\"_id\":\"3\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"charlie\", \"age\": 40}}\n"); + writer.write("{\"_id\":\"4\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"diana\", \"age\": 45}}\n"); + writer.write("{\"_id\":\"5\",\"_version\":\"1\",\"_op_type\":\"index\",\"_source\":{\"name\":\"eve\", \"age\": 50}}\n"); + writer.flush(); + } + + try (FileChannel channel = FileChannel.open(shardFile, StandardOpenOption.READ)) { + channel.force(true); + } + + // Wait for lag to be calculated (lag is updated every 3 seconds in this test) + waitForState(() -> { + PollingIngestStats stats = getPollingIngestStats(index); + return stats != null && stats.getConsumerStats().pointerBasedLag() == 3L; + }); + + // cleanup + client().admin().indices().delete(new DeleteIndexRequest(index)).actionGet(); + } + + /** + * Helper method to get polling ingest stats for the index + */ + private PollingIngestStats getPollingIngestStats(String indexName) { + IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName); + ShardStats[] shards = indexStats.getShards(); + if (shards.length > 0) { + return shards[0].getPollingIngestStats(); + } + return null; + } + + private void waitForState(Callable checkState) throws Exception { + assertBusy(() -> { + if (checkState.call() == false) { + fail("Provided state requirements not met"); + } + }, 1, TimeUnit.MINUTES); + } } diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index f82058276d98c..7138dcaba8c15 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -567,6 +567,66 @@ public void testResetPollerInAllActiveIngestion() throws Exception { }); } + public void testAllActiveOffsetBasedLag() throws Exception { + // Create all-active pull-based index + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + final String nodeB = internalCluster().startDataOnlyNode(); + + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.pointer_based_lag_update_interval", "3s") + .put("ingestion_source.all_active", true) + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + ensureGreen(indexName); + // no messages published, expect 0 lag + assertTrue(validateOffsetBasedLagForPrimaryAndReplica(0)); + + // pause ingestion + PauseIngestionResponse pauseResponse = pauseIngestion(indexName); + assertTrue(pauseResponse.isAcknowledged()); + assertTrue(pauseResponse.isShardsAcknowledged()); + waitForState(() -> { + GetIngestionStateResponse ingestionState = getIngestionState(indexName); + return ingestionState.getShardStates().length == 2 + && ingestionState.getFailedShards() == 0 + && Arrays.stream(ingestionState.getShardStates()) + .allMatch(state -> state.isPollerPaused() && state.getPollerState().equalsIgnoreCase("paused")); + }); + + // produce 10 messages in paused state and validate lag + for (int i = 0; i < 10; i++) { + produceData(Integer.toString(i), "name" + i, "30"); + } + waitForState(() -> validateOffsetBasedLagForPrimaryAndReplica(10)); + + // resume ingestion + ResumeIngestionResponse resumeResponse = resumeIngestion(indexName); + assertTrue(resumeResponse.isAcknowledged()); + assertTrue(resumeResponse.isShardsAcknowledged()); + waitForState(() -> { + GetIngestionStateResponse ingestionState = getIngestionState(indexName); + return ingestionState.getShardStates().length == 2 + && Arrays.stream(ingestionState.getShardStates()) + .allMatch( + state -> state.isPollerPaused() == false + && (state.getPollerState().equalsIgnoreCase("polling") || state.getPollerState().equalsIgnoreCase("processing")) + ); + }); + waitForSearchableDocs(10, List.of(nodeA, nodeB)); + waitForState(() -> validateOffsetBasedLagForPrimaryAndReplica(0)); + } + // returns PollingIngestStats for single primary and single replica private Map getPollingIngestStatsForPrimaryAndReplica(String indexName) { IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName); @@ -583,4 +643,14 @@ private Map getPollingIngestStatsForPrimaryAndReplic return shardTypeToStats; } + + private boolean validateOffsetBasedLagForPrimaryAndReplica(long expectedLag) { + boolean valid = true; + Map shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName); + valid &= shardTypeToStats.get("primary") != null + && shardTypeToStats.get("primary").getConsumerStats().pointerBasedLag() == expectedLag; + valid &= shardTypeToStats.get("replica") != null + && shardTypeToStats.get("replica").getConsumerStats().pointerBasedLag() == expectedLag; + return valid; + } } diff --git a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java index b3e4528ac609b..25a793d65c171 100644 --- a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java +++ b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java @@ -138,7 +138,7 @@ protected static Consumer createConsumer(String clientId, KafkaS * @param includeStart whether to include the start pointer in the read * @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization. * @param timeoutMillis the maximum time to wait for messages - * @return + * @return the next read result * @throws TimeoutException */ @Override @@ -158,7 +158,7 @@ public List> readNext( * Read the next batch of messages from Kafka. * @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization. * @param timeoutMillis the maximum time to wait for messages - * @return + * @return the next read result * @throws TimeoutException */ @Override @@ -255,6 +255,35 @@ public int getShardId() { return topicPartition.partition(); } + /** + * Compute Kafka offset based lag as the difference between latest available offset and last consumed offset. + * Note: This method is not thread-safe and should only be called from the poller thread to avoid multi-threaded + * access to KafkaConsumer. + * + * @param expectedStartPointer the pointer where ingestion would start if no messages have been consumed yet + * @return offset based lag. -1 is returned if errors are encountered. + */ + @Override + public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) { + try { + // Get the end offset for the partition + long endOffset = consumer.endOffsets(Collections.singletonList(topicPartition)).getOrDefault(topicPartition, 0L); + + if (lastFetchedOffset < 0) { + // Haven't fetched anything yet, use the expected start pointer. + // Set lag as 0 in case expectedStartPointer is beyond endOffset. + long startOffset = ((KafkaOffset) expectedStartPointer).getOffset(); + return Math.max(0, endOffset - startOffset); + } + + // Calculate lag as difference between latest and last consumed offset + return endOffset - lastFetchedOffset - 1; + } catch (Exception e) { + logger.warn("Failed to calculate pointer based lag for partition {}: {}", topicPartition.partition(), e.getMessage()); + return -1; + } + } + @Override public void close() throws IOException { consumer.close(); diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java index 78e563b8446c4..096e2df8a7fe2 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java @@ -137,4 +137,80 @@ public void testCreateConsumer() { assertNotNull(consumer); assertEquals(KafkaConsumer.class, consumer.getClass()); } + + public void testGetPointerBasedLagBeforeAnyFetch() { + // Before any messages are fetched, lag should be calculated from expected start pointer + TopicPartition topicPartition = new TopicPartition("test-topic", 0); + when(mockConsumer.endOffsets(Collections.singletonList(topicPartition))).thenReturn(Collections.singletonMap(topicPartition, 100L)); + + // Expected start pointer is offset 0 + KafkaOffset expectedStartPointer = new KafkaOffset(0L); + long lag = consumer.getPointerBasedLag(expectedStartPointer); + + assertEquals(100L, lag); + } + + public void testGetPointerBasedLagAfterFetch() throws Exception { + // Simulate fetching messages up to offset 50 + TopicPartition topicPartition = new TopicPartition("test-topic", 0); + ConsumerRecord record1 = new ConsumerRecord<>( + "test-topic", + 0, + 50, + null, + "message".getBytes(StandardCharsets.UTF_8) + ); + ConsumerRecords records = new ConsumerRecords<>( + Collections.singletonMap(topicPartition, Collections.singletonList(record1)) + ); + + when(mockConsumer.poll(any(Duration.class))).thenReturn(records); + consumer.readNext(new KafkaOffset(50), true, 10, 1000); + + // Now endOffset is 100, lastFetchedOffset is 50 + // expected lag is 49 + when(mockConsumer.endOffsets(Collections.singletonList(topicPartition))).thenReturn(Collections.singletonMap(topicPartition, 100L)); + + // Expected start pointer doesn't matter since we've already fetched + KafkaOffset expectedStartPointer = new KafkaOffset(0L); + long lag = consumer.getPointerBasedLag(expectedStartPointer); + + assertEquals(49L, lag); + } + + public void testGetPointerBasedLagWhenCaughtUp() throws Exception { + // Simulate being caught up: lastFetchedOffset = endOffset - 1 + TopicPartition topicPartition = new TopicPartition("test-topic", 0); + ConsumerRecord record = new ConsumerRecord<>("test-topic", 0, 99, null, "message".getBytes(StandardCharsets.UTF_8)); + ConsumerRecords records = new ConsumerRecords<>( + Collections.singletonMap(topicPartition, Collections.singletonList(record)) + ); + + when(mockConsumer.poll(any(Duration.class))).thenReturn(records); + consumer.readNext(new KafkaOffset(99), true, 10, 1000); + + // endOffset is 100, lastFetchedOffset is 99 + // expected lag is 0 + when(mockConsumer.endOffsets(Collections.singletonList(topicPartition))).thenReturn(Collections.singletonMap(topicPartition, 100L)); + + // Expected start pointer doesn't matter since we've already fetched + KafkaOffset expectedStartPointer = new KafkaOffset(0L); + long lag = consumer.getPointerBasedLag(expectedStartPointer); + + assertEquals(0L, lag); + } + + public void testGetPointerBasedLagHandlesException() { + // Simulate an exception when calling endOffsets + TopicPartition topicPartition = new TopicPartition("test-topic", 0); + when(mockConsumer.endOffsets(Collections.singletonList(topicPartition))).thenThrow( + new RuntimeException("Kafka broker unavailable") + ); + + KafkaOffset expectedStartPointer = new KafkaOffset(0L); + long lag = consumer.getPointerBasedLag(expectedStartPointer); + + // Should return -1 on exception + assertEquals(-1, lag); + } } diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java index eead93e7881a2..e1f88d68062ce 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java @@ -80,6 +80,7 @@ public void testPauseAndResumeAPIs() throws Exception { .put("ingestion_source.param.topic", topicName) .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) .put("index.replication.type", "SEGMENT") + .put("ingestion_source.pointer_based_lag_update_interval", "0") .build(), mappings ); diff --git a/plugins/ingestion-kinesis/src/main/java/org/opensearch/plugin/kinesis/KinesisShardConsumer.java b/plugins/ingestion-kinesis/src/main/java/org/opensearch/plugin/kinesis/KinesisShardConsumer.java index 2c88073ce8f04..92ee9e543ff86 100644 --- a/plugins/ingestion-kinesis/src/main/java/org/opensearch/plugin/kinesis/KinesisShardConsumer.java +++ b/plugins/ingestion-kinesis/src/main/java/org/opensearch/plugin/kinesis/KinesisShardConsumer.java @@ -247,6 +247,15 @@ public int getShardId() { return shardId; } + /** + * Kinesis does not support pointer based lag. Return default value of 0. + * @param expectedStartPointer the pointer where ingestion would start if no messages have been consumed yet + */ + @Override + public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) { + return 0; + } + @Override public void close() throws IOException { if (kinesisClient != null) { diff --git a/plugins/ingestion-kinesis/src/test/java/org/opensearch/plugin/kinesis/KinesisShardConsumerTests.java b/plugins/ingestion-kinesis/src/test/java/org/opensearch/plugin/kinesis/KinesisShardConsumerTests.java index 441583061491c..f02dddd661b1e 100644 --- a/plugins/ingestion-kinesis/src/test/java/org/opensearch/plugin/kinesis/KinesisShardConsumerTests.java +++ b/plugins/ingestion-kinesis/src/test/java/org/opensearch/plugin/kinesis/KinesisShardConsumerTests.java @@ -111,6 +111,8 @@ public void testReadNext() throws TimeoutException { Assert.assertEquals(1, results.size()); Assert.assertEquals("12345", results.get(0).getPointer().getSequenceNumber()); + // pointer based lag is not supported, expect default 0 lag + Assert.assertEquals(0, consumer.getPointerBasedLag(new SequenceNumber("0"))); } public void testEarliestPointer() { diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index f5e0a4a8cb211..353a355837ed0 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -51,6 +51,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.Assertions; import org.opensearch.core.common.Strings; @@ -93,6 +94,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM; @@ -907,6 +909,20 @@ public Iterator> settings() { Setting.Property.Final ); + /** + * Defines the pointer-based lag update interval for pull-based ingestion. + * This controls how frequently the lag between the latest available message and the last consumed message is calculated. + * Setting this to 0 disables pointer-based lag calculation entirely. + */ + public static final String SETTING_INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL = + "index.ingestion_source.pointer_based_lag_update_interval"; + public static final Setting INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING = Setting.positiveTimeSetting( + SETTING_INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL, + new TimeValue(10, TimeUnit.SECONDS), + Property.IndexScope, + Property.Final + ); + /** * Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the * streaming source and process the updates. In the default document replication mode, this setting must be enabled. @@ -1210,6 +1226,7 @@ public IngestionSource getIngestionSource() { final int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.get(settings); final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings); final boolean allActiveIngestionEnabled = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings); + final TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.get(settings); return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams) .setPointerInitReset(pointerInitReset) @@ -1219,6 +1236,7 @@ public IngestionSource getIngestionSource() { .setNumProcessorThreads(numProcessorThreads) .setBlockingQueueSize(blockingQueueSize) .setAllActiveIngestion(allActiveIngestionEnabled) + .setPointerBasedLagUpdateInterval(pointerBasedLagUpdateInterval) .build(); } return null; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java index a3ea7a73677f1..9a8b4bfa1baf5 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java @@ -10,6 +10,7 @@ import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.indices.pollingingest.IngestionErrorStrategy; import org.opensearch.indices.pollingingest.StreamPoller; @@ -21,6 +22,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POLL_TIMEOUT; /** @@ -37,6 +39,7 @@ public class IngestionSource { private int numProcessorThreads; private int blockingQueueSize; private final boolean allActiveIngestion; + private final TimeValue pointerBasedLagUpdateInterval; private IngestionSource( String type, @@ -47,7 +50,8 @@ private IngestionSource( int pollTimeout, int numProcessorThreads, int blockingQueueSize, - boolean allActiveIngestion + boolean allActiveIngestion, + TimeValue pointerBasedLagUpdateInterval ) { this.type = type; this.pointerInitReset = pointerInitReset; @@ -58,6 +62,7 @@ private IngestionSource( this.numProcessorThreads = numProcessorThreads; this.blockingQueueSize = blockingQueueSize; this.allActiveIngestion = allActiveIngestion; + this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval; } public String getType() { @@ -96,6 +101,10 @@ public boolean isAllActiveIngestionEnabled() { return allActiveIngestion; } + public TimeValue getPointerBasedLagUpdateInterval() { + return pointerBasedLagUpdateInterval; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -109,7 +118,8 @@ public boolean equals(Object o) { && Objects.equals(pollTimeout, ingestionSource.pollTimeout) && Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads) && Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize) - && Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion); + && Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion) + && Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval); } @Override @@ -123,7 +133,8 @@ public int hashCode() { pollTimeout, numProcessorThreads, blockingQueueSize, - allActiveIngestion + allActiveIngestion, + pointerBasedLagUpdateInterval ); } @@ -151,6 +162,8 @@ public String toString() { + blockingQueueSize + ", allActiveIngestion=" + allActiveIngestion + + ", pointerBasedLagUpdateInterval=" + + pointerBasedLagUpdateInterval + '}'; } @@ -209,6 +222,9 @@ public static class Builder { private int numProcessorThreads = INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING.getDefault(Settings.EMPTY); private int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.getDefault(Settings.EMPTY); private boolean allActiveIngestion = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getDefault(Settings.EMPTY); + private TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.getDefault( + Settings.EMPTY + ); public Builder(String type) { this.type = type; @@ -222,6 +238,7 @@ public Builder(IngestionSource ingestionSource) { this.params = ingestionSource.params; this.blockingQueueSize = ingestionSource.blockingQueueSize; this.allActiveIngestion = ingestionSource.allActiveIngestion; + this.pointerBasedLagUpdateInterval = ingestionSource.pointerBasedLagUpdateInterval; } public Builder setPointerInitReset(PointerInitReset pointerInitReset) { @@ -269,6 +286,11 @@ public Builder setAllActiveIngestion(boolean allActiveIngestion) { return this; } + public Builder setPointerBasedLagUpdateInterval(TimeValue pointerBasedLagUpdateInterval) { + this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval; + return this; + } + public IngestionSource build() { return new IngestionSource( type, @@ -279,7 +301,8 @@ public IngestionSource build() { pollTimeout, numProcessorThreads, blockingQueueSize, - allActiveIngestion + allActiveIngestion, + pointerBasedLagUpdateInterval ); } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index e34976ebeb60e..775c114c61389 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -281,6 +281,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING, IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING, IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING, + IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING, // Settings for search replica IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java b/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java index e55b4f59b61fa..7188b3277fe2f 100644 --- a/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java +++ b/server/src/main/java/org/opensearch/index/IngestionShardConsumer.java @@ -109,4 +109,11 @@ List> readNext(T pointer, boolean includeStart, long maxMessage * @return the shard id */ int getShardId(); + + /** + * Returns the pointer based lag for the current shard consumer. + * @param expectedStartPointer the ingestion start point on first time shard initialization. + * @return pointer based lag if available, else 0. + */ + long getPointerBasedLag(IngestionShardPointer expectedStartPointer); } diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 1197f6df2e75d..e6e4729915c87 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -145,6 +145,7 @@ private void initializeStreamPoller( .pollTimeout(ingestionSource.getPollTimeout()) .numProcessorThreads(ingestionSource.getNumProcessorThreads()) .blockingQueueSize(ingestionSource.getBlockingQueueSize()) + .pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis()) .build(); registerStreamPollerListener(); diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java index e60d1e91c5183..21559ae2cb236 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java @@ -49,6 +49,8 @@ public class DefaultStreamPoller implements StreamPoller { private volatile boolean isWriteBlockEnabled; private volatile long lastPolledMessageTimestamp = 0; + private volatile long cachedPointerBasedLag = 0; + private volatile long lastPointerBasedLagUpdateTime = 0; @Nullable private IngestionShardConsumer consumer; @@ -67,6 +69,7 @@ public class DefaultStreamPoller implements StreamPoller { private long maxPollSize; private int pollTimeout; + private long pointerBasedLagUpdateIntervalMs; private final String indexName; @@ -91,7 +94,8 @@ private DefaultStreamPoller( long maxPollSize, int pollTimeout, int numProcessorThreads, - int blockingQueueSize + int blockingQueueSize, + long pointerBasedLagUpdateIntervalMs ) { this( startPointer, @@ -105,6 +109,7 @@ private DefaultStreamPoller( initialState, maxPollSize, pollTimeout, + pointerBasedLagUpdateIntervalMs, ingestionEngine.config().getIndexSettings() ); } @@ -124,6 +129,7 @@ private DefaultStreamPoller( State initialState, long maxPollSize, int pollTimeout, + long pointerBasedLagUpdateIntervalMs, IndexSettings indexSettings ) { this.consumerFactory = Objects.requireNonNull(consumerFactory); @@ -135,6 +141,7 @@ private DefaultStreamPoller( this.state = initialState; this.maxPollSize = maxPollSize; this.pollTimeout = pollTimeout; + this.pointerBasedLagUpdateIntervalMs = pointerBasedLagUpdateIntervalMs; this.blockingQueueContainer = blockingQueueContainer; this.consumerThread = Executors.newSingleThreadExecutor( r -> new Thread(r, String.format(Locale.ROOT, "stream-poller-consumer-%d-%d", shardId, System.currentTimeMillis())) @@ -192,6 +199,9 @@ protected void startPoll() { // reset the consumer offset handleResetState(); + // Update lag periodically + updatePointerBasedLagIfNeeded(); + if (paused || isWriteBlockEnabled) { state = State.PAUSED; try { @@ -363,14 +373,15 @@ public PollingIngestStats getStats() { builder.setTotalConsumerErrorCount(totalConsumerErrorCount.count()); builder.setTotalPollerMessageFailureCount(totalPollerMessageFailureCount.count()); builder.setTotalPollerMessageDroppedCount(totalPollerMessageDroppedCount.count()); - builder.setLagInMillis(computeLag()); + builder.setLagInMillis(computeTimeBasedLag()); + builder.setPointerBasedLag(cachedPointerBasedLag); return builder.build(); } /** * Returns the lag in milliseconds since the last polled message */ - private long computeLag() { + private long computeTimeBasedLag() { if (lastPolledMessageTimestamp == 0 || paused) { return 0; } @@ -384,6 +395,29 @@ private void setLastPolledMessageTimestamp(long timestamp) { } } + /** + * Update the cached pointer-based lag if enough time has elapsed since the last update. + * {@code consumer.getPointerBasedLag()} is called from the poller thread, so it's safe to access the consumer. + * If pointerBasedLagUpdateIntervalMs is 0, pointer-based lag calculation is disabled. + */ + private void updatePointerBasedLagIfNeeded() { + // If interval is 0, pointer-based lag is disabled + if (pointerBasedLagUpdateIntervalMs == 0) { + return; + } + + long currentTime = System.currentTimeMillis(); + if (consumer != null && (currentTime - lastPointerBasedLagUpdateTime >= pointerBasedLagUpdateIntervalMs)) { + try { + // update the lastPointerBasedLagUpdateTime first, to avoid load on streaming source in case of errors + lastPointerBasedLagUpdateTime = currentTime; + cachedPointerBasedLag = consumer.getPointerBasedLag(initialBatchStartPointer); + } catch (Exception e) { + logger.warn("Failed to update lag for index {} shard {}: {}", indexName, shardId, e.getMessage()); + } + } + } + public State getState() { return this.state; } @@ -498,6 +532,7 @@ public static class Builder { private int pollTimeout = 1000; private int numProcessorThreads = 1; private int blockingQueueSize = 100; + private long pointerBasedLagUpdateIntervalMs = 10000; /** * Initialize the builder with mandatory parameters @@ -581,6 +616,14 @@ public Builder blockingQueueSize(int blockingQueueSize) { return this; } + /** + * Set pointer-based lag update interval in milliseconds + */ + public Builder pointerBasedLagUpdateInterval(long pointerBasedLagUpdateInterval) { + this.pointerBasedLagUpdateIntervalMs = pointerBasedLagUpdateInterval; + return this; + } + /** * Build the DefaultStreamPoller instance */ @@ -598,7 +641,8 @@ public DefaultStreamPoller build() { maxPollSize, pollTimeout, numProcessorThreads, - blockingQueueSize + blockingQueueSize, + pointerBasedLagUpdateIntervalMs ); } } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java b/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java index f0f8feac54d29..f946e3ae950db 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/PollingIngestStats.java @@ -8,6 +8,7 @@ package org.opensearch.indices.pollingingest; +import org.opensearch.Version; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -52,13 +53,20 @@ public PollingIngestStats(StreamInput in) throws IOException { long totalPollerMessageFailureCount = in.readLong(); long totalPollerMessageDroppedCount = in.readLong(); long totalDuplicateMessageSkippedCount = in.readLong(); + + long pointerBasedLag = 0; + if (in.getVersion().onOrAfter(Version.V_3_4_0)) { + pointerBasedLag = in.readLong(); + } + this.consumerStats = new ConsumerStats( totalPolledCount, lagInMillis, totalConsumerErrorCount, totalPollerMessageFailureCount, totalPollerMessageDroppedCount, - totalDuplicateMessageSkippedCount + totalDuplicateMessageSkippedCount, + pointerBasedLag ); } @@ -76,6 +84,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(consumerStats.totalPollerMessageFailureCount); out.writeLong(consumerStats.totalPollerMessageDroppedCount); out.writeLong(consumerStats.totalDuplicateMessageSkippedCount); + + if (out.getVersion().onOrAfter(Version.V_3_4_0)) { + out.writeLong(consumerStats.pointerBasedLag); + } } @Override @@ -96,6 +108,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("total_poller_message_dropped_count", consumerStats.totalPollerMessageDroppedCount); builder.field("total_duplicate_message_skipped_count", consumerStats.totalDuplicateMessageSkippedCount); builder.field("lag_in_millis", consumerStats.lagInMillis); + builder.field("pointer_based_lag", consumerStats.pointerBasedLag); builder.endObject(); builder.endObject(); return builder; @@ -137,7 +150,7 @@ public record MessageProcessorStats(long totalProcessedCount, long totalInvalidM */ @ExperimentalApi public record ConsumerStats(long totalPolledCount, long lagInMillis, long totalConsumerErrorCount, long totalPollerMessageFailureCount, - long totalPollerMessageDroppedCount, long totalDuplicateMessageSkippedCount) { + long totalPollerMessageDroppedCount, long totalDuplicateMessageSkippedCount, long pointerBasedLag) { } /** @@ -157,6 +170,7 @@ public static class Builder { private long totalPollerMessageFailureCount; private long totalPollerMessageDroppedCount; private long totalDuplicateMessageSkippedCount; + private long pointerBasedLag; public Builder() {} @@ -224,6 +238,11 @@ public Builder setTotalDuplicateMessageSkippedCount(long totalDuplicateMessageSk return this; } + public Builder setPointerBasedLag(long pointerBasedLag) { + this.pointerBasedLag = pointerBasedLag; + return this; + } + public PollingIngestStats build() { MessageProcessorStats messageProcessorStats = new MessageProcessorStats( totalProcessedCount, @@ -239,7 +258,8 @@ public PollingIngestStats build() { totalConsumerErrorCount, totalPollerMessageFailureCount, totalPollerMessageDroppedCount, - totalDuplicateMessageSkippedCount + totalDuplicateMessageSkippedCount, + pointerBasedLag ); return new PollingIngestStats(messageProcessorStats, consumerStats); } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java index c741ed0a6d989..b83c78ecf5eee 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java @@ -8,6 +8,7 @@ package org.opensearch.cluster.metadata; +import org.opensearch.common.unit.TimeValue; import org.opensearch.indices.pollingingest.StreamPoller; import org.opensearch.test.OpenSearchTestCase; @@ -30,6 +31,7 @@ public void testConstructorAndGetters() { .setPointerInitReset(pointerInitReset) .setErrorStrategy(DROP) .setBlockingQueueSize(1000) + .setPointerBasedLagUpdateInterval(TimeValue.timeValueSeconds(1)) .build(); assertEquals("type", source.getType()); @@ -40,6 +42,7 @@ public void testConstructorAndGetters() { assertEquals(1000, source.getMaxPollSize()); assertEquals(1000, source.getPollTimeout()); assertEquals(1000, source.getBlockingQueueSize()); + assertEquals(1, source.getPointerBasedLagUpdateInterval().getSeconds()); } public void testEquals() { @@ -105,7 +108,7 @@ public void testToString() { .setErrorStrategy(DROP) .build(); String expected = - "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false}"; + "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false, pointerBasedLagUpdateInterval=10s}"; assertEquals(expected, source.toString()); } diff --git a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java index c7a5c430d2595..afe9f9f5a5043 100644 --- a/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java +++ b/server/src/test/java/org/opensearch/index/engine/FakeIngestionSource.java @@ -116,6 +116,11 @@ public int getShardId() { return shardId; } + @Override + public long getPointerBasedLag(IngestionShardPointer expectedStartPointer) { + return 0; + } + @Override public void close() throws IOException { diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java index 8e0a1f5dfc3be..1b7b3cc87609a 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java @@ -90,6 +90,7 @@ public void setUp() throws Exception { StreamPoller.State.NONE, 1000, 1000, + 10000, indexSettings ); partitionedBlockingQueueContainer.startProcessorThreads(); @@ -162,6 +163,7 @@ public void testResetStateEarliest() throws InterruptedException { StreamPoller.State.NONE, 1000, 1000, + 10000, indexSettings ); CountDownLatch latch = new CountDownLatch(2); @@ -190,6 +192,7 @@ public void testResetStateLatest() throws InterruptedException { StreamPoller.State.NONE, 1000, 1000, + 10000, indexSettings ); @@ -214,6 +217,7 @@ public void testResetStateRewindByOffset() throws InterruptedException { StreamPoller.State.NONE, 1000, 1000, + 10000, indexSettings ); CountDownLatch latch = new CountDownLatch(1); @@ -298,6 +302,7 @@ public void testDropErrorIngestionStrategy() throws TimeoutException, Interrupte StreamPoller.State.NONE, 1000, 1000, + 10000, indexSettings ); poller.start(); @@ -359,6 +364,7 @@ public void testBlockErrorIngestionStrategy() throws TimeoutException, Interrupt StreamPoller.State.NONE, 1000, 1000, + 10000, indexSettings ); poller.start(); @@ -394,6 +400,7 @@ public void testProcessingErrorWithBlockErrorIngestionStrategy() throws TimeoutE StreamPoller.State.NONE, 1000, 1000, + 10000, indexSettings ); poller.start(); @@ -465,6 +472,7 @@ public void testPersistedBatchStartPointer() throws TimeoutException, Interrupte StreamPoller.State.NONE, 1000, 1000, + 10000, indexSettings ); poller.start(); @@ -533,6 +541,7 @@ public void testConsumerInitializationRetry() throws Exception { StreamPoller.State.NONE, 1000, 1000, + 10000, indexSettings ); diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/PollingIngestStatsTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/PollingIngestStatsTests.java index ea368d7ac354b..c64f0eaf7fd1e 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/PollingIngestStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/PollingIngestStatsTests.java @@ -51,6 +51,8 @@ public void testToXContent() throws IOException { + stats.getConsumerStats().totalDuplicateMessageSkippedCount() + ",\"lag_in_millis\":" + stats.getConsumerStats().lagInMillis() + + ",\"pointer_based_lag\":" + + stats.getConsumerStats().pointerBasedLag() + "}}}"; assertEquals(expected, builder.toString());