diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java index c5026a660210f..a518b6cc4670d 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.datastreams.action; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DocValuesSkipper; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.PointValues; import org.elasticsearch.action.ActionListener; @@ -129,18 +130,21 @@ protected void shardOperation( assert indexAbstraction != null; DataStream dataStream = indexAbstraction.getParentDataStream(); assert dataStream != null; - long maxTimestamp = 0L; - try (Engine.Searcher searcher = indexShard.acquireSearcher(ReadOnlyEngine.FIELD_RANGE_SEARCH_SOURCE)) { - IndexReader indexReader = searcher.getIndexReader(); - byte[] maxPackedValue = PointValues.getMaxPackedValue(indexReader, DataStream.TIMESTAMP_FIELD_NAME); - if (maxPackedValue != null) { - maxTimestamp = LongPoint.decodeDimension(maxPackedValue, 0); - } - } - return new DataStreamsStatsAction.DataStreamShardStats(indexShard.routingEntry(), storeStats, maxTimestamp); + return new DataStreamsStatsAction.DataStreamShardStats(indexShard.routingEntry(), storeStats, getMaxTimestamp(indexShard)); }); } + private static long getMaxTimestamp(IndexShard indexShard) throws IOException { + try (Engine.Searcher searcher = indexShard.acquireSearcher(ReadOnlyEngine.FIELD_RANGE_SEARCH_SOURCE)) { + IndexReader indexReader = searcher.getIndexReader(); + byte[] maxPackedValue = PointValues.getMaxPackedValue(indexReader, DataStream.TIMESTAMP_FIELD_NAME); + if (maxPackedValue != null) { + return LongPoint.decodeDimension(maxPackedValue, 0); + } + return DocValuesSkipper.globalMaxValue(searcher, DataStream.TIMESTAMP_FIELD_NAME); + } + } + @Override protected String[] resolveConcreteIndexNames(ClusterState clusterState, DataStreamsStatsAction.Request request) { return DataStreamsActionUtil.resolveConcreteIndexNames( diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsStatsTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsStatsTests.java index 6f0f2935449da..f70a1183ca64d 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsStatsTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsStatsTests.java @@ -29,6 +29,9 @@ import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; @@ -126,7 +129,7 @@ public void testStatsExistingDataStream() throws Exception { public void testStatsExistingDataStreamWithFailureStores() throws Exception { maybeCreateCreatedStandAloneIndicesIndex(); - String dataStreamName = createDataStream(false, true); + String dataStreamName = createDataStream(false, true, false); createFailedDocument(dataStreamName); DataStreamsStatsAction.Response stats = getDataStreamsStats(); @@ -149,7 +152,7 @@ public void testStatsExistingDataStreamWithFailureStores() throws Exception { public void testStatsExistingHiddenDataStream() throws Exception { maybeCreateCreatedStandAloneIndicesIndex(); - String dataStreamName = createDataStream(true, false); + String dataStreamName = createDataStream(true, false, false); long timestamp = createDocument(dataStreamName); DataStreamsStatsAction.Response stats = getDataStreamsStats(true); @@ -264,16 +267,53 @@ public void testStatsMultipleDataStreams() throws Exception { } } + public void testStatsMultipleDataStreamsWithSparseTimestampIndexes() throws Exception { + maybeCreateCreatedStandAloneIndicesIndex(); + for (int dataStreamCount = 0; dataStreamCount < (2 + randomInt(3)); dataStreamCount++) { + createDataStream(false, false, true); + } + + // Create a number of documents in each data stream + Map maxTimestamps = new HashMap<>(); + for (String createdDataStream : createdDataStreams) { + for (int documentCount = 0; documentCount < (1 + randomInt(10)); documentCount++) { + long ts = createDocument(createdDataStream); + long maxTS = max(maxTimestamps.getOrDefault(createdDataStream, 0L), ts); + maxTimestamps.put(createdDataStream, maxTS); + } + } + + DataStreamsStatsAction.Response stats = getDataStreamsStats(); + logger.error(stats.toString()); + assertEquals(createdDataStreams.size(), stats.getSuccessfulShards()); + assertEquals(0, stats.getFailedShards()); + assertEquals(createdDataStreams.size(), stats.getDataStreamCount()); + assertEquals(createdDataStreams.size(), stats.getBackingIndices()); + assertNotEquals(0L, stats.getTotalStoreSize().getBytes()); + assertEquals(createdDataStreams.size(), stats.getDataStreams().length); + for (DataStreamsStatsAction.DataStreamStats dataStreamStats : stats.getDataStreams()) { + Long expectedMaxTS = maxTimestamps.get(dataStreamStats.getDataStream()); + assertNotNull("All indices should have max timestamps", expectedMaxTS); + assertEquals(1, dataStreamStats.getBackingIndices()); + assertEquals(expectedMaxTS.longValue(), dataStreamStats.getMaximumTimestamp()); + assertNotEquals(0L, dataStreamStats.getStoreSize().getBytes()); + } + } + private String createDataStream() throws Exception { - return createDataStream(false, false); + return createDataStream(false, false, false); } - private String createDataStream(boolean hidden, boolean failureStore) throws Exception { + private String createDataStream(boolean hidden, boolean failureStore, boolean logsMode) throws Exception { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault()); DataStreamOptions.Template failureStoreOptions = failureStore == false ? null : new DataStreamOptions.Template(DataStreamFailureStore.builder().enabled(true).buildTemplate()); - Template idxTemplate = new Template(null, new CompressedXContent(""" + Settings.Builder settingsBuilder = Settings.builder(); + if (randomBoolean()) { + settingsBuilder.put(IndexSettings.MODE.getKey(), logsMode ? IndexMode.LOGSDB.getName() : IndexMode.STANDARD.getName()); + } + Template idxTemplate = new Template(settingsBuilder.build(), new CompressedXContent(""" {"properties":{"@timestamp":{"type":"date"},"data":{"type":"keyword"}}} """), null, null, failureStoreOptions); ComposableIndexTemplate template = ComposableIndexTemplate.builder()