Skip to content

Commit 0c61bc6

Browse files
authored
Backport: auto create data streams using index templates v2 (#56596)
Backport: #55377 This commit adds the ability to auto create data streams using index templates v2. Index templates (v2) now have a data_steam field that includes a timestamp field, if provided and index name matches with that template then a data stream (plus first backing index) is auto created. Relates to #53100
1 parent b449661 commit 0c61bc6

File tree

33 files changed

+792
-259
lines changed

33 files changed

+792
-259
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesClientIT.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2039,7 +2039,8 @@ public void testIndexTemplates() throws Exception {
20392039
AliasMetadata alias = AliasMetadata.builder("alias").writeIndex(true).build();
20402040
Template template = new Template(settings, mappings, Collections.singletonMap("alias", alias));
20412041
List<String> pattern = Collections.singletonList("pattern");
2042-
IndexTemplateV2 indexTemplate = new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>());
2042+
IndexTemplateV2 indexTemplate =
2043+
new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>(), null);
20432044
PutIndexTemplateV2Request putIndexTemplateV2Request =
20442045
new PutIndexTemplateV2Request().name(templateName).create(true).indexTemplate(indexTemplate);
20452046

@@ -2085,7 +2086,8 @@ public void testSimulateIndexTemplate() throws Exception {
20852086
AliasMetadata alias = AliasMetadata.builder("alias").writeIndex(true).build();
20862087
Template template = new Template(settings, mappings, org.elasticsearch.common.collect.Map.of("alias", alias));
20872088
List<String> pattern = org.elasticsearch.common.collect.List.of("pattern");
2088-
IndexTemplateV2 indexTemplate = new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>());
2089+
IndexTemplateV2 indexTemplate =
2090+
new IndexTemplateV2(pattern, template, Collections.emptyList(), 1L, 1L, new HashMap<>(), null);
20892091
PutIndexTemplateV2Request putIndexTemplateV2Request =
20902092
new PutIndexTemplateV2Request().name(templateName).create(true).indexTemplate(indexTemplate);
20912093

@@ -2097,7 +2099,7 @@ public void testSimulateIndexTemplate() throws Exception {
20972099
AliasMetadata simulationAlias = AliasMetadata.builder("simulation-alias").writeIndex(true).build();
20982100
IndexTemplateV2 simulationTemplate = new IndexTemplateV2(pattern, new Template(null, null,
20992101
org.elasticsearch.common.collect.Map.of("simulation-alias", simulationAlias)), Collections.emptyList(), 2L, 1L,
2100-
new HashMap<>());
2102+
new HashMap<>(), null);
21012103
PutIndexTemplateV2Request newIndexTemplateReq =
21022104
new PutIndexTemplateV2Request().name("used-for-simulation").create(true).indexTemplate(indexTemplate);
21032105
newIndexTemplateReq.indexTemplate(simulationTemplate);

client/rest-high-level/src/test/java/org/elasticsearch/client/indices/GetIndexTemplatesV2ResponseTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ private static IndexTemplateV2 randomIndexTemplate() {
7575
List<String> patterns = Arrays.asList(generateRandomStringArray(10, 10, false, false));
7676
List<String> composedOf = null;
7777
Map<String, Object> meta = null;
78+
IndexTemplateV2.DataStreamTemplate dataStreamTemplate = null;
7879
if (randomBoolean()) {
7980
composedOf = Arrays.asList(generateRandomStringArray(10, 10, false, false));
8081
}
@@ -84,6 +85,9 @@ private static IndexTemplateV2 randomIndexTemplate() {
8485

8586
Long priority = randomBoolean() ? null : randomNonNegativeLong();
8687
Long version = randomBoolean() ? null : randomNonNegativeLong();
87-
return new IndexTemplateV2(patterns, randomTemplate(), composedOf, priority, version, meta);
88+
if (randomBoolean()) {
89+
dataStreamTemplate = new IndexTemplateV2.DataStreamTemplate(randomAlphaOfLength(8));
90+
}
91+
return new IndexTemplateV2(patterns, randomTemplate(), composedOf, priority, version, meta, dataStreamTemplate);
8892
}
8993
}

rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
---
22
"Create data stream":
33
- skip:
4-
version: " - 7.7.99"
5-
reason: available only in 7.8+
4+
version: " - 7.8.99"
5+
reason: "data streams only supported in 7.9+"
66

77
- do:
88
indices.create_data_stream:
@@ -62,8 +62,8 @@
6262
---
6363
"Create data stream with invalid name":
6464
- skip:
65-
version: " - 7.7.99"
66-
reason: available only in 7.8+
65+
version: " - 7.8.99"
66+
reason: "data streams only supported in 7.9+"
6767

6868
- do:
6969
catch: bad_request
@@ -78,8 +78,8 @@
7878
---
7979
"Get data stream":
8080
- skip:
81-
version: " - 7.7.99"
82-
reason: available only in 7.8+
81+
version: " - 7.8.99"
82+
reason: "data streams only supported in 7.9+"
8383

8484
- do:
8585
indices.create_data_stream:
@@ -147,8 +147,8 @@
147147
---
148148
"Delete data stream with backing indices":
149149
- skip:
150-
version: " - 7.7.99"
151-
reason: available only in 7.8+
150+
version: " - 7.8.99"
151+
reason: data streams only supported in 7.9+
152152

153153
- do:
154154
indices.create_data_stream:

rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/20_unsupported_apis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
---
22
"Test apis that do not supported data streams":
33
- skip:
4-
version: " - 7.7.99"
5-
reason: available only in 7.8+
4+
version: " - 7.8.99"
5+
reason: "data streams only supported in 7.9+"
66

77
- do:
88
indices.create_data_stream:
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
---
2+
"Put index template":
3+
- skip:
4+
version: " - 7.8.99"
5+
reason: "data streams supported from 7.9"
6+
features: allowed_warnings
7+
8+
- do:
9+
allowed_warnings:
10+
- "index template [test] has index patterns [test-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [test] will take precedence during new index creation"
11+
indices.put_index_template:
12+
name: generic_logs_template
13+
body:
14+
index_patterns: logs-*
15+
data_stream:
16+
timestamp_field: timestamp
17+
template:
18+
settings:
19+
number_of_shards: 1
20+
number_of_replicas: 0
21+
mappings:
22+
properties:
23+
timestamp:
24+
type: date
25+
26+
- do:
27+
index:
28+
index: logs-foobar
29+
refresh: true
30+
body: { foo: bar }
31+
32+
- do:
33+
search:
34+
index: logs-foobar
35+
body: { query: { match_all: {} } }
36+
- length: { hits.hits: 1 }
37+
- match: { hits.hits.0._index: logs-foobar-000001 }
38+
- match: { hits.hits.0._source.foo: 'bar' }
39+
40+
- do:
41+
indices.get_data_streams:
42+
name: logs-foobar
43+
- match: { 0.name: logs-foobar }
44+
- match: { 0.timestamp_field: 'timestamp' }
45+
- length: { 0.indices: 1 }
46+
- match: { 0.indices.0.index_name: 'logs-foobar-000001' }
47+
48+
- do:
49+
indices.delete_data_stream:
50+
name: logs-foobar
51+
- is_true: acknowledged

rest-api-spec/src/main/resources/rest-api-spec/test/indices.delete/20_backing_indices.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
---
22
"Delete backing index on data stream":
33
- skip:
4-
version: " - 7.99.99"
5-
reason: "enable in 7.8+ after backporting"
4+
version: " - 7.8.99"
5+
reason: "data streams only supported in 7.9+"
66

77
- do:
88
indices.create_data_stream:
@@ -55,8 +55,8 @@
5555
---
5656
"Attempt to delete write index on data stream is rejected":
5757
- skip:
58-
version: " - 7.99.99"
59-
reason: "enable in 7.8+ after backporting"
58+
version: " - 7.8.99"
59+
reason: "data streams only supported in 7.9+"
6060

6161
- do:
6262
indices.create_data_stream:

rest-api-spec/src/main/resources/rest-api-spec/test/indices.get/20_backing_indices.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
---
22
"Get backing indices for data stream":
33
- skip:
4-
version: " - 7.99.99"
5-
reason: "enable in 7.8+ after backporting"
4+
version: " - 7.8.99"
5+
reason: "data streams only supported in 7.9+"
66

77
- do:
88
indices.create_data_stream:

rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/50_data_streams.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
---
22
"Roll over a data stream":
33
- skip:
4-
version: " - 7.99.99"
5-
reason: "enable in 7.8+ after backporting"
4+
version: " - 7.8.99"
5+
reason: "data streams only supported in 7.9+"
66

77
- do:
88
indices.create_data_stream:

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkIntegrationIT.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,25 @@
2424
import org.elasticsearch.Version;
2525
import org.elasticsearch.action.ActionRequestValidationException;
2626
import org.elasticsearch.action.admin.indices.alias.Alias;
27+
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction;
28+
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
29+
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
2730
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
31+
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateV2Action;
32+
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
33+
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateV2Action;
2834
import org.elasticsearch.action.index.IndexRequest;
2935
import org.elasticsearch.action.index.IndexResponse;
3036
import org.elasticsearch.action.ingest.PutPipelineRequest;
3137
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3238
import org.elasticsearch.action.support.replication.ReplicationRequest;
39+
import org.elasticsearch.cluster.metadata.DataStream;
3340
import org.elasticsearch.cluster.metadata.IndexMetadata;
41+
import org.elasticsearch.cluster.metadata.IndexTemplateV2;
42+
import org.elasticsearch.cluster.metadata.Template;
43+
import org.elasticsearch.common.Strings;
3444
import org.elasticsearch.common.bytes.BytesReference;
45+
import org.elasticsearch.common.settings.Settings;
3546
import org.elasticsearch.common.xcontent.XContentBuilder;
3647
import org.elasticsearch.common.xcontent.XContentType;
3748
import org.elasticsearch.ingest.IngestTestPlugin;
@@ -44,20 +55,25 @@
4455
import java.util.Arrays;
4556
import java.util.Collection;
4657
import java.util.Collections;
58+
import java.util.Comparator;
4759
import java.util.Map;
4860
import java.util.concurrent.ExecutionException;
4961
import java.util.concurrent.atomic.AtomicBoolean;
5062
import java.util.concurrent.atomic.AtomicInteger;
5163

64+
import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE;
5265
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
5366
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
5467
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
5568
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
5669
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
70+
import static org.hamcrest.Matchers.arrayWithSize;
5771
import static org.hamcrest.Matchers.containsInAnyOrder;
5872
import static org.hamcrest.Matchers.containsString;
5973
import static org.hamcrest.Matchers.equalTo;
6074
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
75+
import static org.hamcrest.Matchers.hasItemInArray;
76+
import static org.hamcrest.Matchers.hasSize;
6177
import static org.hamcrest.Matchers.is;
6278
import static org.hamcrest.Matchers.oneOf;
6379

@@ -202,4 +218,87 @@ public void testDeleteIndexWhileIndexing() throws Exception {
202218
assertFalse(thread.isAlive());
203219
}
204220
}
221+
222+
public void testMixedAutoCreate() {
223+
Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
224+
225+
PutIndexTemplateV2Action.Request createTemplateRequest = new PutIndexTemplateV2Action.Request("logs-foo");
226+
createTemplateRequest.indexTemplate(
227+
new IndexTemplateV2(
228+
Collections.singletonList("logs-foo*"),
229+
new Template(settings, null, null),
230+
null, null, null, null,
231+
new IndexTemplateV2.DataStreamTemplate("@timestamp"))
232+
);
233+
client().execute(PutIndexTemplateV2Action.INSTANCE, createTemplateRequest).actionGet();
234+
235+
BulkRequest bulkRequest = new BulkRequest();
236+
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
237+
bulkRequest.add(new IndexRequest("logs-foobaz").opType(CREATE).source("{}", XContentType.JSON));
238+
bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
239+
bulkRequest.add(new IndexRequest("logs-barfoo").opType(CREATE).source("{}", XContentType.JSON));
240+
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
241+
assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
242+
243+
bulkRequest = new BulkRequest();
244+
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
245+
bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{}", XContentType.JSON));
246+
bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
247+
bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{}", XContentType.JSON));
248+
bulkResponse = client().bulk(bulkRequest).actionGet();
249+
assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
250+
251+
bulkRequest = new BulkRequest();
252+
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
253+
bulkRequest.add(new IndexRequest("logs-foobaz2").opType(CREATE).source("{}", XContentType.JSON));
254+
bulkRequest.add(new IndexRequest("logs-foobaz3").opType(CREATE).source("{}", XContentType.JSON));
255+
bulkRequest.add(new IndexRequest("logs-barbaz").opType(CREATE).source("{}", XContentType.JSON));
256+
bulkRequest.add(new IndexRequest("logs-barfoo2").opType(CREATE).source("{}", XContentType.JSON));
257+
bulkRequest.add(new IndexRequest("logs-barfoo3").opType(CREATE).source("{}", XContentType.JSON));
258+
bulkResponse = client().bulk(bulkRequest).actionGet();
259+
assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
260+
261+
GetDataStreamsAction.Request getDataStreamRequest = new GetDataStreamsAction.Request("*");
262+
GetDataStreamsAction.Response getDataStreamsResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
263+
assertThat(getDataStreamsResponse.getDataStreams(), hasSize(4));
264+
getDataStreamsResponse.getDataStreams().sort(Comparator.comparing(DataStream::getName));
265+
assertThat(getDataStreamsResponse.getDataStreams().get(0).getName(), equalTo("logs-foobar"));
266+
assertThat(getDataStreamsResponse.getDataStreams().get(1).getName(), equalTo("logs-foobaz"));
267+
assertThat(getDataStreamsResponse.getDataStreams().get(2).getName(), equalTo("logs-foobaz2"));
268+
assertThat(getDataStreamsResponse.getDataStreams().get(3).getName(), equalTo("logs-foobaz3"));
269+
270+
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices("logs-bar*")).actionGet();
271+
assertThat(getIndexResponse.getIndices(), arrayWithSize(4));
272+
assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-barbaz"));
273+
assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-barfoo"));
274+
assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-barfoo2"));
275+
assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-barfoo3"));
276+
277+
DeleteIndexTemplateV2Action.Request deleteTemplateRequest = new DeleteIndexTemplateV2Action.Request("*");
278+
client().execute(DeleteIndexTemplateV2Action.INSTANCE, deleteTemplateRequest).actionGet();
279+
}
280+
281+
public void testAutoCreateV1TemplateNoDataStream() {
282+
Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
283+
284+
PutIndexTemplateRequest v1Request = new PutIndexTemplateRequest("logs-foo");
285+
v1Request.patterns(Collections.singletonList("logs-foo*"));
286+
v1Request.settings(settings);
287+
v1Request.order(Integer.MAX_VALUE); // in order to avoid number_of_replicas being overwritten by random_template
288+
client().admin().indices().putTemplate(v1Request).actionGet();
289+
290+
BulkRequest bulkRequest = new BulkRequest();
291+
bulkRequest.add(new IndexRequest("logs-foobar").opType(CREATE).source("{}", XContentType.JSON));
292+
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
293+
assertThat("bulk failures: " + Strings.toString(bulkResponse), bulkResponse.hasFailures(), is(false));
294+
295+
GetDataStreamsAction.Request getDataStreamRequest = new GetDataStreamsAction.Request("*");
296+
GetDataStreamsAction.Response getDataStreamsResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
297+
assertThat(getDataStreamsResponse.getDataStreams(), hasSize(0));
298+
299+
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices("logs-foobar")).actionGet();
300+
assertThat(getIndexResponse.getIndices(), arrayWithSize(1));
301+
assertThat(getIndexResponse.getIndices(), hasItemInArray("logs-foobar"));
302+
assertThat(getIndexResponse.getSettings().get("logs-foobar").get(IndexMetadata.SETTING_NUMBER_OF_REPLICAS), equalTo("0"));
303+
}
205304
}

0 commit comments

Comments
 (0)