Skip to content

Commit e1167ff

Browse files
authored
DataStreamStatsAction can get maxTimestamp from skippers (#138158)
The `@timestamp` field in logsdb no longer stores data in a points index, so we need to retrieve the max timestamp field form the docvalues skipper instead. Fixes #137208
1 parent 79e62df commit e1167ff

File tree

2 files changed

+58
-14
lines changed

2 files changed

+58
-14
lines changed

modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportDataStreamsStatsAction.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.datastreams.action;
1010

1111
import org.apache.lucene.document.LongPoint;
12+
import org.apache.lucene.index.DocValuesSkipper;
1213
import org.apache.lucene.index.IndexReader;
1314
import org.apache.lucene.index.PointValues;
1415
import org.elasticsearch.action.ActionListener;
@@ -129,18 +130,21 @@ protected void shardOperation(
129130
assert indexAbstraction != null;
130131
DataStream dataStream = indexAbstraction.getParentDataStream();
131132
assert dataStream != null;
132-
long maxTimestamp = 0L;
133-
try (Engine.Searcher searcher = indexShard.acquireSearcher(ReadOnlyEngine.FIELD_RANGE_SEARCH_SOURCE)) {
134-
IndexReader indexReader = searcher.getIndexReader();
135-
byte[] maxPackedValue = PointValues.getMaxPackedValue(indexReader, DataStream.TIMESTAMP_FIELD_NAME);
136-
if (maxPackedValue != null) {
137-
maxTimestamp = LongPoint.decodeDimension(maxPackedValue, 0);
138-
}
139-
}
140-
return new DataStreamsStatsAction.DataStreamShardStats(indexShard.routingEntry(), storeStats, maxTimestamp);
133+
return new DataStreamsStatsAction.DataStreamShardStats(indexShard.routingEntry(), storeStats, getMaxTimestamp(indexShard));
141134
});
142135
}
143136

137+
private static long getMaxTimestamp(IndexShard indexShard) throws IOException {
138+
try (Engine.Searcher searcher = indexShard.acquireSearcher(ReadOnlyEngine.FIELD_RANGE_SEARCH_SOURCE)) {
139+
IndexReader indexReader = searcher.getIndexReader();
140+
byte[] maxPackedValue = PointValues.getMaxPackedValue(indexReader, DataStream.TIMESTAMP_FIELD_NAME);
141+
if (maxPackedValue != null) {
142+
return LongPoint.decodeDimension(maxPackedValue, 0);
143+
}
144+
return DocValuesSkipper.globalMaxValue(searcher, DataStream.TIMESTAMP_FIELD_NAME);
145+
}
146+
}
147+
144148
@Override
145149
protected String[] resolveConcreteIndexNames(ClusterState clusterState, DataStreamsStatsAction.Request request) {
146150
return DataStreamsActionUtil.resolveConcreteIndexNames(

modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsStatsTests.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
import org.elasticsearch.cluster.metadata.DataStreamOptions;
3030
import org.elasticsearch.cluster.metadata.Template;
3131
import org.elasticsearch.common.compress.CompressedXContent;
32+
import org.elasticsearch.common.settings.Settings;
33+
import org.elasticsearch.index.IndexMode;
34+
import org.elasticsearch.index.IndexSettings;
3235
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
3336
import org.elasticsearch.plugins.Plugin;
3437
import org.elasticsearch.test.ESSingleNodeTestCase;
@@ -126,7 +129,7 @@ public void testStatsExistingDataStream() throws Exception {
126129

127130
public void testStatsExistingDataStreamWithFailureStores() throws Exception {
128131
maybeCreateCreatedStandAloneIndicesIndex();
129-
String dataStreamName = createDataStream(false, true);
132+
String dataStreamName = createDataStream(false, true, false);
130133
createFailedDocument(dataStreamName);
131134

132135
DataStreamsStatsAction.Response stats = getDataStreamsStats();
@@ -149,7 +152,7 @@ public void testStatsExistingDataStreamWithFailureStores() throws Exception {
149152

150153
public void testStatsExistingHiddenDataStream() throws Exception {
151154
maybeCreateCreatedStandAloneIndicesIndex();
152-
String dataStreamName = createDataStream(true, false);
155+
String dataStreamName = createDataStream(true, false, false);
153156
long timestamp = createDocument(dataStreamName);
154157

155158
DataStreamsStatsAction.Response stats = getDataStreamsStats(true);
@@ -264,16 +267,53 @@ public void testStatsMultipleDataStreams() throws Exception {
264267
}
265268
}
266269

270+
public void testStatsMultipleDataStreamsWithSparseTimestampIndexes() throws Exception {
271+
maybeCreateCreatedStandAloneIndicesIndex();
272+
for (int dataStreamCount = 0; dataStreamCount < (2 + randomInt(3)); dataStreamCount++) {
273+
createDataStream(false, false, true);
274+
}
275+
276+
// Create a number of documents in each data stream
277+
Map<String, Long> maxTimestamps = new HashMap<>();
278+
for (String createdDataStream : createdDataStreams) {
279+
for (int documentCount = 0; documentCount < (1 + randomInt(10)); documentCount++) {
280+
long ts = createDocument(createdDataStream);
281+
long maxTS = max(maxTimestamps.getOrDefault(createdDataStream, 0L), ts);
282+
maxTimestamps.put(createdDataStream, maxTS);
283+
}
284+
}
285+
286+
DataStreamsStatsAction.Response stats = getDataStreamsStats();
287+
logger.error(stats.toString());
288+
assertEquals(createdDataStreams.size(), stats.getSuccessfulShards());
289+
assertEquals(0, stats.getFailedShards());
290+
assertEquals(createdDataStreams.size(), stats.getDataStreamCount());
291+
assertEquals(createdDataStreams.size(), stats.getBackingIndices());
292+
assertNotEquals(0L, stats.getTotalStoreSize().getBytes());
293+
assertEquals(createdDataStreams.size(), stats.getDataStreams().length);
294+
for (DataStreamsStatsAction.DataStreamStats dataStreamStats : stats.getDataStreams()) {
295+
Long expectedMaxTS = maxTimestamps.get(dataStreamStats.getDataStream());
296+
assertNotNull("All indices should have max timestamps", expectedMaxTS);
297+
assertEquals(1, dataStreamStats.getBackingIndices());
298+
assertEquals(expectedMaxTS.longValue(), dataStreamStats.getMaximumTimestamp());
299+
assertNotEquals(0L, dataStreamStats.getStoreSize().getBytes());
300+
}
301+
}
302+
267303
private String createDataStream() throws Exception {
268-
return createDataStream(false, false);
304+
return createDataStream(false, false, false);
269305
}
270306

271-
private String createDataStream(boolean hidden, boolean failureStore) throws Exception {
307+
private String createDataStream(boolean hidden, boolean failureStore, boolean logsMode) throws Exception {
272308
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
273309
DataStreamOptions.Template failureStoreOptions = failureStore == false
274310
? null
275311
: new DataStreamOptions.Template(DataStreamFailureStore.builder().enabled(true).buildTemplate());
276-
Template idxTemplate = new Template(null, new CompressedXContent("""
312+
Settings.Builder settingsBuilder = Settings.builder();
313+
if (randomBoolean()) {
314+
settingsBuilder.put(IndexSettings.MODE.getKey(), logsMode ? IndexMode.LOGSDB.getName() : IndexMode.STANDARD.getName());
315+
}
316+
Template idxTemplate = new Template(settingsBuilder.build(), new CompressedXContent("""
277317
{"properties":{"@timestamp":{"type":"date"},"data":{"type":"keyword"}}}
278318
"""), null, null, failureStoreOptions);
279319
ComposableIndexTemplate template = ComposableIndexTemplate.builder()

0 commit comments

Comments
 (0)