Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.datastreams.action;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.DocValuesSkipper;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.PointValues;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -129,18 +130,21 @@ protected void shardOperation(
assert indexAbstraction != null;
DataStream dataStream = indexAbstraction.getParentDataStream();
assert dataStream != null;
long maxTimestamp = 0L;
try (Engine.Searcher searcher = indexShard.acquireSearcher(ReadOnlyEngine.FIELD_RANGE_SEARCH_SOURCE)) {
IndexReader indexReader = searcher.getIndexReader();
byte[] maxPackedValue = PointValues.getMaxPackedValue(indexReader, DataStream.TIMESTAMP_FIELD_NAME);
if (maxPackedValue != null) {
maxTimestamp = LongPoint.decodeDimension(maxPackedValue, 0);
}
}
return new DataStreamsStatsAction.DataStreamShardStats(indexShard.routingEntry(), storeStats, maxTimestamp);
return new DataStreamsStatsAction.DataStreamShardStats(indexShard.routingEntry(), storeStats, getMaxTimestamp(indexShard));
});
}

private static long getMaxTimestamp(IndexShard indexShard) throws IOException {
try (Engine.Searcher searcher = indexShard.acquireSearcher(ReadOnlyEngine.FIELD_RANGE_SEARCH_SOURCE)) {
IndexReader indexReader = searcher.getIndexReader();
byte[] maxPackedValue = PointValues.getMaxPackedValue(indexReader, DataStream.TIMESTAMP_FIELD_NAME);
if (maxPackedValue != null) {
return LongPoint.decodeDimension(maxPackedValue, 0);
}
return DocValuesSkipper.globalMaxValue(searcher, DataStream.TIMESTAMP_FIELD_NAME);
}
}

@Override
protected String[] resolveConcreteIndexNames(ClusterState clusterState, DataStreamsStatsAction.Request request) {
return DataStreamsActionUtil.resolveConcreteIndexNames(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
Expand Down Expand Up @@ -126,7 +129,7 @@ public void testStatsExistingDataStream() throws Exception {

public void testStatsExistingDataStreamWithFailureStores() throws Exception {
maybeCreateCreatedStandAloneIndicesIndex();
String dataStreamName = createDataStream(false, true);
String dataStreamName = createDataStream(false, true, false);
createFailedDocument(dataStreamName);

DataStreamsStatsAction.Response stats = getDataStreamsStats();
Expand All @@ -149,7 +152,7 @@ public void testStatsExistingDataStreamWithFailureStores() throws Exception {

public void testStatsExistingHiddenDataStream() throws Exception {
maybeCreateCreatedStandAloneIndicesIndex();
String dataStreamName = createDataStream(true, false);
String dataStreamName = createDataStream(true, false, false);
long timestamp = createDocument(dataStreamName);

DataStreamsStatsAction.Response stats = getDataStreamsStats(true);
Expand Down Expand Up @@ -264,16 +267,53 @@ public void testStatsMultipleDataStreams() throws Exception {
}
}

public void testStatsMultipleDataStreamsWithSparseTimestampIndexes() throws Exception {
maybeCreateCreatedStandAloneIndicesIndex();
for (int dataStreamCount = 0; dataStreamCount < (2 + randomInt(3)); dataStreamCount++) {
createDataStream(false, false, true);
}

// Create a number of documents in each data stream
Map<String, Long> maxTimestamps = new HashMap<>();
for (String createdDataStream : createdDataStreams) {
for (int documentCount = 0; documentCount < (1 + randomInt(10)); documentCount++) {
long ts = createDocument(createdDataStream);
long maxTS = max(maxTimestamps.getOrDefault(createdDataStream, 0L), ts);
maxTimestamps.put(createdDataStream, maxTS);
}
}

DataStreamsStatsAction.Response stats = getDataStreamsStats();
logger.error(stats.toString());
assertEquals(createdDataStreams.size(), stats.getSuccessfulShards());
assertEquals(0, stats.getFailedShards());
assertEquals(createdDataStreams.size(), stats.getDataStreamCount());
assertEquals(createdDataStreams.size(), stats.getBackingIndices());
assertNotEquals(0L, stats.getTotalStoreSize().getBytes());
assertEquals(createdDataStreams.size(), stats.getDataStreams().length);
for (DataStreamsStatsAction.DataStreamStats dataStreamStats : stats.getDataStreams()) {
Long expectedMaxTS = maxTimestamps.get(dataStreamStats.getDataStream());
assertNotNull("All indices should have max timestamps", expectedMaxTS);
assertEquals(1, dataStreamStats.getBackingIndices());
assertEquals(expectedMaxTS.longValue(), dataStreamStats.getMaximumTimestamp());
assertNotEquals(0L, dataStreamStats.getStoreSize().getBytes());
}
}

private String createDataStream() throws Exception {
return createDataStream(false, false);
return createDataStream(false, false, false);
}

private String createDataStream(boolean hidden, boolean failureStore) throws Exception {
private String createDataStream(boolean hidden, boolean failureStore, boolean logsMode) throws Exception {
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
DataStreamOptions.Template failureStoreOptions = failureStore == false
? null
: new DataStreamOptions.Template(DataStreamFailureStore.builder().enabled(true).buildTemplate());
Template idxTemplate = new Template(null, new CompressedXContent("""
Settings.Builder settingsBuilder = Settings.builder();
if (randomBoolean()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to you, but I'd prefer to have two tests, one for logsdb and one not. I subscribe to the view (there was a discussion on Slack about this) that randomization in tests should be used for stuff you don't care about, whereas in this case we actually care about both cases, and it'd be better if we definitely found out right away if we broke it rather than only having a 50/50 chance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point, I'll update.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

settingsBuilder.put(IndexSettings.MODE.getKey(), logsMode ? IndexMode.LOGSDB.getName() : IndexMode.STANDARD.getName());
}
Template idxTemplate = new Template(settingsBuilder.build(), new CompressedXContent("""
{"properties":{"@timestamp":{"type":"date"},"data":{"type":"keyword"}}}
"""), null, null, failureStoreOptions);
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
Expand Down