Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,27 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class TsdbDataStreamRestIT extends ESRestTestCase {

Expand Down Expand Up @@ -84,6 +88,57 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
}
}""";

private static final String NON_TSDB_TEMPLATE = """
{
"index_patterns": ["k8s*"],
"template": {
"settings":{
"index": {
"number_of_replicas": 0,
"number_of_shards": 2
}
},
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"metricset": {
"type": "keyword"
},
"k8s": {
"properties": {
"pod": {
"properties": {
"uid": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"ip": {
"type": "ip"
},
"network": {
"properties": {
"tx": {
"type": "long"
},
"rx": {
"type": "long"
}
}
}
}
}
}
}
}
}
},
"data_stream": {}
}""";

private static final String DOC = """
{
"@timestamp": "$time",
Expand Down Expand Up @@ -235,6 +290,82 @@ public void testSubsequentRollovers() throws Exception {
}
}

public void testMigrateRegularDataStreamToTsdbDataStream() throws Exception {
// Create a non tsdb template
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(NON_TSDB_TEMPLATE);
assertOK(client().performRequest(putComposableIndexTemplateRequest));

// Index a few docs and sometimes rollover
int numRollovers = 4;
int numDocs = 32;
var currentTime = Instant.now();
var currentMinus30Days = currentTime.minus(30, ChronoUnit.DAYS);
for (int i = 0; i < numRollovers; i++) {
for (int j = 0; j < numDocs; j++) {
var indexRequest = new Request("POST", "/k8s/_doc");
var time = Instant.ofEpochMilli(randomLongBetween(currentMinus30Days.toEpochMilli(), currentTime.toEpochMilli()));
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
var response = client().performRequest(indexRequest);
assertOK(response);
var responseBody = entityAsMap(response);
// i rollovers and +1 offset:
assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", i + 1));
}
var rolloverRequest = new Request("POST", "/k8s/_rollover");
var rolloverResponse = client().performRequest(rolloverRequest);
assertOK(rolloverResponse);
var rolloverResponseBody = entityAsMap(rolloverResponse);
assertThat(rolloverResponseBody.get("rolled_over"), is(true));
}

var getDataStreamsRequest = new Request("GET", "/_data_stream");
var getDataStreamResponse = client().performRequest(getDataStreamsRequest);
assertOK(getDataStreamResponse);
var dataStreams = entityAsMap(getDataStreamResponse);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(5));
for (int i = 0; i < 5; i++) {
String backingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices." + i + ".index_name");
assertThat(backingIndex, backingIndexEqualTo("k8s", i + 1));
var indices = getIndex(backingIndex);
var escapedBackingIndex = backingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), nullValue());
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), nullValue());
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), nullValue());
}

// Update template
putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
assertOK(client().performRequest(putComposableIndexTemplateRequest));

var rolloverRequest = new Request("POST", "/k8s/_rollover");
var rolloverResponse = client().performRequest(rolloverRequest);
assertOK(rolloverResponse);
var rolloverResponseBody = entityAsMap(rolloverResponse);
assertThat(rolloverResponseBody.get("rolled_over"), is(true));
var newIndex = (String) rolloverResponseBody.get("new_index");
assertThat(newIndex, backingIndexEqualTo("k8s", 6));

// Ingest documents that will land in the new tsdb backing index:
for (int i = 0; i < numDocs; i++) {
var indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentTime)));
var response = client().performRequest(indexRequest);
assertOK(response);
var responseBody = entityAsMap(response);
assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", 6));
}

// Fail if documents target older non tsdb backing index:
var indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentMinus30Days)));
var e = expectThrows(ResponseException.class, () -> client().performRequest(indexRequest));
assertThat(e.getMessage(), containsString("is outside of ranges of currently writable indices"));
}

private static Map<?, ?> getIndex(String indexName) throws IOException {
var getIndexRequest = new Request("GET", "/" + indexName + "?human");
var response = client().performRequest(getIndexRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ public Settings getAdditionalIndexSettings(
) {
if (dataStreamName != null) {
DataStream dataStream = metadata.dataStreams().get(dataStreamName);
// First backing index is created and then data stream is rolled over (in a single cluster state update).
// So at this point we can't check index_mode==time_series,
// so checking that index_mode==null and templateIndexMode == TIME_SERIES
boolean migrating = dataStream != null && dataStream.getIndexMode() == null && templateIndexMode == IndexMode.TIME_SERIES;
IndexMode indexMode;
if (dataStream != null) {
if (migrating) {
indexMode = IndexMode.TIME_SERIES;
} else if (dataStream != null) {
indexMode = dataStream.getIndexMode();
} else {
indexMode = templateIndexMode;
Expand All @@ -50,7 +56,7 @@ public Settings getAdditionalIndexSettings(
TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings);
final Instant start;
final Instant end;
if (dataStream == null) {
if (dataStream == null || migrating) {
start = resolvedAt.minusMillis(lookAheadTime.getMillis());
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public class MetadataDataStreamRolloverServiceTests extends ESTestCase {

Expand All @@ -63,7 +64,7 @@ public void testRolloverClusterStateForDataStream() throws Exception {
IndexMode.TIME_SERIES
);
ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*"))
.template(new Template(Settings.builder().put("index.mode", "time_series").build(), null, null))
.template(new Template(Settings.builder().put("index.routing_path", "uid").build(), null, null))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES))
.build();
Metadata.Builder builder = Metadata.builder();
Expand All @@ -75,6 +76,7 @@ public void testRolloverClusterStateForDataStream() throws Exception {
.put("index.hidden", true)
.put(SETTING_INDEX_UUID, dataStream.getWriteIndex().getUUID())
.put("index.mode", "time_series")
.put("index.routing_path", "uid")
.put("index.time_series.start_time", FORMATTER.format(now.minus(4, ChronoUnit.HOURS)))
.put("index.time_series.end_time", FORMATTER.format(now.minus(2, ChronoUnit.HOURS)))
)
Expand Down Expand Up @@ -144,4 +146,90 @@ public void testRolloverClusterStateForDataStream() throws Exception {
}
}

public void testRolloverAndMigrateDataStream() throws Exception {
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
String dataStreamName = "logs-my-app";
final DataStream dataStream = new DataStream(
dataStreamName,
new DataStream.TimestampField("@timestamp"),
List.of(new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 1, now.toEpochMilli()), "uuid")),
1,
null,
false,
false,
false,
false,
null
);
ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStream.getName() + "*"))
.template(new Template(Settings.builder().put("index.routing_path", "uid").build(), null, null))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, IndexMode.TIME_SERIES))
.build();
Metadata.Builder builder = Metadata.builder();
builder.put("template", template);
builder.put(
IndexMetadata.builder(dataStream.getWriteIndex().getName())
.settings(
ESTestCase.settings(Version.CURRENT)
.put("index.hidden", true)
.put(SETTING_INDEX_UUID, dataStream.getWriteIndex().getUUID())
)
.numberOfShards(1)
.numberOfReplicas(0)
);
builder.put(dataStream);
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();

ThreadPool testThreadPool = new TestThreadPool(getTestName());
try {
MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
dataStream,
testThreadPool,
Set.of(new DataStreamIndexSettingsProvider()),
xContentRegistry()
);
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");

MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(
clusterState,
dataStream.getName(),
null,
createIndexRequest,
metConditions,
now,
randomBoolean(),
false
);

String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration());
String newIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
assertEquals(sourceIndexName, rolloverResult.sourceIndexName());
assertEquals(newIndexName, rolloverResult.rolloverIndexName());
Metadata rolloverMetadata = rolloverResult.clusterState().metadata();
assertEquals(dataStream.getIndices().size() + 1, rolloverMetadata.indices().size());

// Assert data stream's index_mode has been changed to time_series.
assertThat(rolloverMetadata.dataStreams().get(dataStreamName), notNullValue());
assertThat(rolloverMetadata.dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.TIME_SERIES));

// Nothing changed for the original backing index:
IndexMetadata im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(0));
assertThat(IndexSettings.MODE.exists(im.getSettings()), is(false));
assertThat(IndexSettings.TIME_SERIES_START_TIME.exists(im.getSettings()), is(false));
assertThat(IndexSettings.TIME_SERIES_END_TIME.exists(im.getSettings()), is(false));
// New backing index is a tsdb index:
im = rolloverMetadata.index(rolloverMetadata.dataStreams().get(dataStreamName).getIndices().get(1));
assertThat(IndexSettings.MODE.get(im.getSettings()), equalTo(IndexMode.TIME_SERIES));
Instant startTime = IndexSettings.TIME_SERIES_START_TIME.get(im.getSettings());
Instant endTime = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
assertThat(startTime.isBefore(endTime), is(true));
assertThat(startTime, equalTo(now.minus(2, ChronoUnit.HOURS)));
assertThat(endTime, equalTo(now.plus(2, ChronoUnit.HOURS)));
} finally {
testThreadPool.shutdown();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ private RolloverResult rolloverDataStream(
currentState,
createIndexClusterStateRequest,
silent,
(builder, indexMetadata) -> builder.put(ds.rollover(indexMetadata.getIndex(), newGeneration))
(builder, indexMetadata) -> builder.put(
ds.rollover(indexMetadata.getIndex(), newGeneration, templateV2.getDataStreamTemplate().getIndexMode())
)
);

RolloverInfo rolloverInfo = new RolloverInfo(dataStreamName, metConditions, threadPool.absoluteTimeInMillis());
Expand Down
Loading