Skip to content

Commit ae7defa

Browse files
authored
Allow regular data streams to be migrated to tsdb data streams. (#83843)
A regular data stream can be migrated to a tsdb data stream if in template that created the data stream, the `index_mode` field is set to `time_series` and the data stream's `index_mode` property is either not specified or set to `standard`. Then on the next rollover the data stream is migrated to be a tsdb data stream. When that happens the data stream's `index_mode` property is set to `time_series` and the new backing index's `index.mode` index setting is also set to `time_series`. Closes #83520
1 parent f9a64b2 commit ae7defa

File tree

8 files changed

+425
-21
lines changed

8 files changed

+425
-21
lines changed

modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/TsdbDataStreamRestIT.java

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,27 @@
88
package org.elasticsearch.datastreams;
99

1010
import org.elasticsearch.client.Request;
11+
import org.elasticsearch.client.ResponseException;
1112
import org.elasticsearch.common.time.DateFormatter;
1213
import org.elasticsearch.common.time.FormatNames;
1314
import org.elasticsearch.test.rest.ESRestTestCase;
1415
import org.elasticsearch.test.rest.yaml.ObjectPath;
1516

1617
import java.io.IOException;
1718
import java.time.Instant;
19+
import java.time.temporal.ChronoUnit;
1820
import java.util.Map;
1921

2022
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
2123
import static org.hamcrest.Matchers.aMapWithSize;
2224
import static org.hamcrest.Matchers.contains;
25+
import static org.hamcrest.Matchers.containsString;
2326
import static org.hamcrest.Matchers.empty;
2427
import static org.hamcrest.Matchers.equalTo;
2528
import static org.hamcrest.Matchers.hasSize;
2629
import static org.hamcrest.Matchers.is;
2730
import static org.hamcrest.Matchers.notNullValue;
31+
import static org.hamcrest.Matchers.nullValue;
2832

2933
public class TsdbDataStreamRestIT extends ESRestTestCase {
3034

@@ -84,6 +88,57 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
8488
}
8589
}""";
8690

91+
private static final String NON_TSDB_TEMPLATE = """
92+
{
93+
"index_patterns": ["k8s*"],
94+
"template": {
95+
"settings":{
96+
"index": {
97+
"number_of_replicas": 0,
98+
"number_of_shards": 2
99+
}
100+
},
101+
"mappings":{
102+
"properties": {
103+
"@timestamp" : {
104+
"type": "date"
105+
},
106+
"metricset": {
107+
"type": "keyword"
108+
},
109+
"k8s": {
110+
"properties": {
111+
"pod": {
112+
"properties": {
113+
"uid": {
114+
"type": "keyword"
115+
},
116+
"name": {
117+
"type": "keyword"
118+
},
119+
"ip": {
120+
"type": "ip"
121+
},
122+
"network": {
123+
"properties": {
124+
"tx": {
125+
"type": "long"
126+
},
127+
"rx": {
128+
"type": "long"
129+
}
130+
}
131+
}
132+
}
133+
}
134+
}
135+
}
136+
}
137+
}
138+
},
139+
"data_stream": {}
140+
}""";
141+
87142
private static final String DOC = """
88143
{
89144
"@timestamp": "$time",
@@ -235,6 +290,82 @@ public void testSubsequentRollovers() throws Exception {
235290
}
236291
}
237292

293+
public void testMigrateRegularDataStreamToTsdbDataStream() throws Exception {
294+
// Create a non tsdb template
295+
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
296+
putComposableIndexTemplateRequest.setJsonEntity(NON_TSDB_TEMPLATE);
297+
assertOK(client().performRequest(putComposableIndexTemplateRequest));
298+
299+
// Index a few docs and sometimes rollover
300+
int numRollovers = 4;
301+
int numDocs = 32;
302+
var currentTime = Instant.now();
303+
var currentMinus30Days = currentTime.minus(30, ChronoUnit.DAYS);
304+
for (int i = 0; i < numRollovers; i++) {
305+
for (int j = 0; j < numDocs; j++) {
306+
var indexRequest = new Request("POST", "/k8s/_doc");
307+
var time = Instant.ofEpochMilli(randomLongBetween(currentMinus30Days.toEpochMilli(), currentTime.toEpochMilli()));
308+
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
309+
var response = client().performRequest(indexRequest);
310+
assertOK(response);
311+
var responseBody = entityAsMap(response);
312+
// i rollovers and +1 offset:
313+
assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", i + 1));
314+
}
315+
var rolloverRequest = new Request("POST", "/k8s/_rollover");
316+
var rolloverResponse = client().performRequest(rolloverRequest);
317+
assertOK(rolloverResponse);
318+
var rolloverResponseBody = entityAsMap(rolloverResponse);
319+
assertThat(rolloverResponseBody.get("rolled_over"), is(true));
320+
}
321+
322+
var getDataStreamsRequest = new Request("GET", "/_data_stream");
323+
var getDataStreamResponse = client().performRequest(getDataStreamsRequest);
324+
assertOK(getDataStreamResponse);
325+
var dataStreams = entityAsMap(getDataStreamResponse);
326+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
327+
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(5));
328+
for (int i = 0; i < 5; i++) {
329+
String backingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices." + i + ".index_name");
330+
assertThat(backingIndex, backingIndexEqualTo("k8s", i + 1));
331+
var indices = getIndex(backingIndex);
332+
var escapedBackingIndex = backingIndex.replace(".", "\\.");
333+
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
334+
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), nullValue());
335+
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), nullValue());
336+
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), nullValue());
337+
}
338+
339+
// Update template
340+
putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
341+
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
342+
assertOK(client().performRequest(putComposableIndexTemplateRequest));
343+
344+
var rolloverRequest = new Request("POST", "/k8s/_rollover");
345+
var rolloverResponse = client().performRequest(rolloverRequest);
346+
assertOK(rolloverResponse);
347+
var rolloverResponseBody = entityAsMap(rolloverResponse);
348+
assertThat(rolloverResponseBody.get("rolled_over"), is(true));
349+
var newIndex = (String) rolloverResponseBody.get("new_index");
350+
assertThat(newIndex, backingIndexEqualTo("k8s", 6));
351+
352+
// Ingest documents that will land in the new tsdb backing index:
353+
for (int i = 0; i < numDocs; i++) {
354+
var indexRequest = new Request("POST", "/k8s/_doc");
355+
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentTime)));
356+
var response = client().performRequest(indexRequest);
357+
assertOK(response);
358+
var responseBody = entityAsMap(response);
359+
assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", 6));
360+
}
361+
362+
// Fail if documents target older non tsdb backing index:
363+
var indexRequest = new Request("POST", "/k8s/_doc");
364+
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentMinus30Days)));
365+
var e = expectThrows(ResponseException.class, () -> client().performRequest(indexRequest));
366+
assertThat(e.getMessage(), containsString("is outside of ranges of currently writable indices"));
367+
}
368+
238369
private static Map<?, ?> getIndex(String indexName) throws IOException {
239370
var getIndexRequest = new Request("GET", "/" + indexName + "?human");
240371
var response = client().performRequest(getIndexRequest);

modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamIndexSettingsProvider.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,16 @@ public Settings getAdditionalIndexSettings(
3636
) {
3737
if (dataStreamName != null) {
3838
DataStream dataStream = metadata.dataStreams().get(dataStreamName);
39+
// First backing index is created and then data stream is rolled over (in a single cluster state update).
40+
// So at this point we can't check index_mode==time_series,
41+
// so checking that index_mode==null|standard and templateIndexMode == TIME_SERIES
42+
boolean migrating = dataStream != null
43+
&& (dataStream.getIndexMode() == null || dataStream.getIndexMode() == IndexMode.STANDARD)
44+
&& templateIndexMode == IndexMode.TIME_SERIES;
3945
IndexMode indexMode;
40-
if (dataStream != null) {
46+
if (migrating) {
47+
indexMode = IndexMode.TIME_SERIES;
48+
} else if (dataStream != null) {
4149
indexMode = dataStream.getIndexMode();
4250
} else {
4351
indexMode = templateIndexMode;
@@ -50,7 +58,7 @@ public Settings getAdditionalIndexSettings(
5058
TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings);
5159
final Instant start;
5260
final Instant end;
53-
if (dataStream == null) {
61+
if (dataStream == null || migrating) {
5462
start = resolvedAt.minusMillis(lookAheadTime.getMillis());
5563
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
5664
} else {

0 commit comments

Comments
 (0)