Skip to content

Commit dc703d7

Browse files
authored
Add explicit generation attribute to data streams
1 parent 7d5f74e commit dc703d7

File tree

12 files changed

+69
-46
lines changed

12 files changed

+69
-46
lines changed

rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
name: "*"
2424
- match: { 0.name: simple-data-stream1 }
2525
- match: { 0.timestamp_field: '@timestamp' }
26+
- match: { 0.generation: 1 }
2627
- length: { 0.indices: 1 }
2728
- match: { 0.indices.0.index_name: 'simple-data-stream1-000001' }
2829
- match: { 1.name: simple-data-stream2 }
2930
- match: { 1.timestamp_field: '@timestamp2' }
31+
- match: { 0.generation: 1 }
3032
- length: { 1.indices: 1 }
3133
- match: { 1.indices.0.index_name: 'simple-data-stream2-000001' }
3234

@@ -97,22 +99,27 @@
9799
indices.get_data_streams: {}
98100
- match: { 0.name: get-data-stream1 }
99101
- match: { 0.timestamp_field: '@timestamp' }
102+
- match: { 0.generation: 1 }
100103
- match: { 1.name: get-data-stream2 }
101104
- match: { 1.timestamp_field: '@timestamp2' }
105+
- match: { 1.generation: 1 }
102106

103107
- do:
104108
indices.get_data_streams:
105109
name: get-data-stream1
106110
- match: { 0.name: get-data-stream1 }
107111
- match: { 0.timestamp_field: '@timestamp' }
112+
- match: { 0.generation: 1 }
108113

109114
- do:
110115
indices.get_data_streams:
111116
name: get-data-*
112117
- match: { 0.name: get-data-stream1 }
113118
- match: { 0.timestamp_field: '@timestamp' }
119+
- match: { 0.generation: 1 }
114120
- match: { 1.name: get-data-stream2 }
115121
- match: { 1.timestamp_field: '@timestamp2' }
122+
- match: { 1.generation: 1 }
116123

117124
- do:
118125
indices.get_data_streams:
@@ -169,6 +176,7 @@
169176
indices.get_data_streams: {}
170177
- match: { 0.name: delete-data-stream1 }
171178
- match: { 0.timestamp_field: '@timestamp' }
179+
- match: { 0.generation: 1 }
172180
- length: { 0.indices: 1 }
173181
- match: { 0.indices.0.index_name: 'delete-data-stream1-000001' }
174182

server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIn
177177
MetadataCreateIndexService.validateIndexOrAliasName(request.name,
178178
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));
179179

180-
String firstBackingIndexName = request.name + "-000001";
180+
String firstBackingIndexName = DataStream.getBackingIndexName(request.name, 1);
181181
CreateIndexClusterStateUpdateRequest createIndexRequest =
182182
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
183183
.settings(Settings.builder().put("index.hidden", true).build());

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,25 @@
3131

3232
import java.io.IOException;
3333
import java.util.List;
34+
import java.util.Locale;
3435
import java.util.Objects;
3536

3637
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
3738

3839
private final String name;
3940
private final String timeStampField;
4041
private final List<Index> indices;
42+
private long generation;
4143

42-
public DataStream(String name, String timeStampField, List<Index> indices) {
44+
public DataStream(String name, String timeStampField, List<Index> indices, long generation) {
4345
this.name = name;
4446
this.timeStampField = timeStampField;
4547
this.indices = indices;
48+
this.generation = generation;
49+
}
50+
51+
public DataStream(String name, String timeStampField, List<Index> indices) {
52+
this(name, timeStampField, indices, indices.size());
4653
}
4754

4855
public String getName() {
@@ -57,8 +64,16 @@ public List<Index> getIndices() {
5764
return indices;
5865
}
5966

67+
public long getGeneration() {
68+
return generation;
69+
}
70+
71+
public static String getBackingIndexName(String dataStreamName, long generation) {
72+
return String.format(Locale.ROOT, "%s-%06d", dataStreamName, generation);
73+
}
74+
6075
public DataStream(StreamInput in) throws IOException {
61-
this(in.readString(), in.readString(), in.readList(Index::new));
76+
this(in.readString(), in.readString(), in.readList(Index::new), in.readVLong());
6277
}
6378

6479
public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
@@ -70,20 +85,23 @@ public void writeTo(StreamOutput out) throws IOException {
7085
out.writeString(name);
7186
out.writeString(timeStampField);
7287
out.writeList(indices);
88+
out.writeVLong(generation);
7389
}
7490

7591
public static final ParseField NAME_FIELD = new ParseField("name");
7692
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
7793
public static final ParseField INDICES_FIELD = new ParseField("indices");
94+
public static final ParseField GENERATION_FIELD = new ParseField("generation");
7895

7996
@SuppressWarnings("unchecked")
8097
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
81-
args -> new DataStream((String) args[0], (String) args[1], (List<Index>) args[2]));
98+
args -> new DataStream((String) args[0], (String) args[1], (List<Index>) args[2], (Long) args[3]));
8299

83100
static {
84101
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
85102
PARSER.declareString(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD_FIELD);
86103
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD);
104+
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
87105
}
88106

89107
public static DataStream fromXContent(XContentParser parser) throws IOException {
@@ -96,6 +114,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
96114
builder.field(NAME_FIELD.getPreferredName(), name);
97115
builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField);
98116
builder.field(INDICES_FIELD.getPreferredName(), indices);
117+
builder.field(GENERATION_FIELD.getPreferredName(), generation);
99118
builder.endObject();
100119
return builder;
101120
}
@@ -107,11 +126,12 @@ public boolean equals(Object o) {
107126
DataStream that = (DataStream) o;
108127
return name.equals(that.name) &&
109128
timeStampField.equals(that.timeStampField) &&
110-
indices.equals(that.indices);
129+
indices.equals(that.indices) &&
130+
generation == that.generation;
111131
}
112132

113133
@Override
114134
public int hashCode() {
115-
return Objects.hash(name, timeStampField, indices);
135+
return Objects.hash(name, timeStampField, indices, generation);
116136
}
117137
}

server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Objects;
3232
import java.util.stream.Collectors;
3333

34+
import static org.elasticsearch.cluster.metadata.DataStream.getBackingIndexName;
3435
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING;
3536
import static org.elasticsearch.common.collect.List.copyOf;
3637

@@ -266,12 +267,11 @@ class DataStream implements IndexAbstraction {
266267
private final List<IndexMetadata> dataStreamIndices;
267268
private final IndexMetadata writeIndex;
268269

269-
public DataStream(org.elasticsearch.cluster.metadata.DataStream dataStream,
270-
List<IndexMetadata> dataStreamIndices, IndexMetadata writeIndex) {
270+
public DataStream(org.elasticsearch.cluster.metadata.DataStream dataStream, List<IndexMetadata> dataStreamIndices) {
271271
this.dataStream = dataStream;
272272
this.dataStreamIndices = copyOf(dataStreamIndices);
273-
this.writeIndex = writeIndex;
274-
assert dataStreamIndices.contains(writeIndex);
273+
this.writeIndex = dataStreamIndices.get(dataStreamIndices.size() - 1);
274+
assert writeIndex.getIndex().getName().equals(getBackingIndexName(dataStream.getName(), dataStream.getGeneration()));
275275
}
276276

277277
@Override

server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,9 +1458,8 @@ private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
14581458
assert backingIndices.isEmpty() == false;
14591459
assert backingIndices.contains(null) == false;
14601460

1461-
IndexMetadata writeIndex = backingIndices.get(backingIndices.size() - 1);
14621461
IndexAbstraction existing = indicesLookup.put(dataStream.getName(),
1463-
new IndexAbstraction.DataStream(dataStream, backingIndices, writeIndex));
1462+
new IndexAbstraction.DataStream(dataStream, backingIndices));
14641463
if (existing != null) {
14651464
throw new IllegalStateException("data stream [" + dataStream.getName() +
14661465
"] conflicts with existing " + existing.getType().getDisplayName() + " [" + existing.getName() + "]");

server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,9 @@ public void testCreateDataStream() throws Exception {
8080
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req);
8181
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
8282
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
83-
assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue());
84-
assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true"));
83+
assertThat(newState.metadata().index(DataStream.getBackingIndexName(dataStreamName, 1)), notNullValue());
84+
assertThat(newState.metadata().index(DataStream.getBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
85+
equalTo("true"));
8586
}
8687

8788
public void testCreateDuplicateDataStream() throws Exception {

server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636

3737
import java.util.ArrayList;
3838
import java.util.List;
39-
import java.util.Locale;
4039
import java.util.Set;
4140
import java.util.stream.Collectors;
4241

@@ -75,13 +74,7 @@ public void testValidateRequestWithoutName() {
7574
public void testDeleteDataStream() {
7675
final String dataStreamName = "my-data-stream";
7776
final List<String> otherIndices = randomSubsetOf(org.elasticsearch.common.collect.List.of("foo", "bar", "baz"));
78-
79-
ClusterState cs = getClusterState(
80-
org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, org.elasticsearch.common.collect.List.of(
81-
String.format(Locale.ROOT, "%s-%06d", dataStreamName, 1),
82-
String.format(Locale.ROOT, "%s-%06d", dataStreamName, 2)))),
83-
otherIndices);
84-
77+
ClusterState cs = getClusterState(org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, 2)), otherIndices);
8578
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
8679
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req);
8780
assertThat(newState.metadata().dataStreams().size(), equalTo(0));
@@ -122,22 +115,22 @@ private static MetadataDeleteIndexService getMetadataDeleteIndexService() {
122115
/**
123116
* Constructs {@code ClusterState} with the specified data streams and indices.
124117
*
125-
* @param dataStreamAndIndexNames The names of the data streams to create with their respective backing indices
126-
* @param indexNames The names of indices to create that do not back any data streams
118+
* @param dataStreams The names of the data streams to create with their respective number of backing indices
119+
* @param indexNames The names of indices to create that do not back any data streams
127120
*/
128-
private static ClusterState getClusterState(List<Tuple<String, List<String>>> dataStreamAndIndexNames, List<String> indexNames) {
121+
private static ClusterState getClusterState(List<Tuple<String, Integer>> dataStreams, List<String> indexNames) {
129122
Metadata.Builder builder = Metadata.builder();
130123

131124
List<IndexMetadata> allIndices = new ArrayList<>();
132-
for (Tuple<String, List<String>> dsTuple : dataStreamAndIndexNames) {
125+
for (Tuple<String, Integer> dsTuple : dataStreams) {
133126
List<IndexMetadata> backingIndices = new ArrayList<>();
134-
for (String indexName : dsTuple.v2()) {
135-
backingIndices.add(createIndexMetadata(indexName, true));
127+
for (int backingIndexNumber = 1; backingIndexNumber <= dsTuple.v2(); backingIndexNumber++) {
128+
backingIndices.add(createIndexMetadata(DataStream.getBackingIndexName(dsTuple.v1(), backingIndexNumber), true));
136129
}
137130
allIndices.addAll(backingIndices);
138131

139132
DataStream ds = new DataStream(dsTuple.v1(), "@timestamp",
140-
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()));
133+
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), dsTuple.v2());
141134
builder.put(ds);
142135
}
143136

server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ public static List<Index> randomIndexInstances() {
4040
}
4141

4242
public static DataStream randomInstance() {
43-
return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), randomIndexInstances());
43+
List<Index> indices = randomIndexInstances();
44+
long generation = randomLongBetween(1, 128);
45+
String dataStreamName = randomAlphaOfLength(10);
46+
indices.add(new Index(DataStream.getBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
47+
return new DataStream(dataStreamName, randomAlphaOfLength(10), indices, generation);
4448
}
4549

4650
@Override

server/src/test/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolverTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1769,7 +1769,7 @@ public void testDataStreams() {
17691769
Metadata.Builder mdBuilder = Metadata.builder()
17701770
.put(index1, false)
17711771
.put(index2, false)
1772-
.put(new DataStream(dataStreamName, "ts", org.elasticsearch.common.collect.List.of(index1.getIndex(), index2.getIndex())));
1772+
.put(new DataStream(dataStreamName, "ts", org.elasticsearch.common.collect.List.of(index1.getIndex(), index2.getIndex()), 2));
17731773
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
17741774

17751775
{

server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.HashMap;
4949
import java.util.HashSet;
5050
import java.util.List;
51-
import java.util.Locale;
5251
import java.util.Map;
5352
import java.util.Set;
5453

@@ -965,7 +964,7 @@ public void testBuilderRejectsDataStreamThatConflictsWithIndex() {
965964

966965
public void testBuilderRejectsDataStreamThatConflictsWithAlias() {
967966
final String dataStreamName = "my-data-stream";
968-
IndexMetadata idx = createFirstBackingIndex(dataStreamName + "z")
967+
IndexMetadata idx = createFirstBackingIndex(dataStreamName)
969968
.putAlias(AliasMetadata.builder(dataStreamName).build())
970969
.build();
971970
Metadata.Builder b = Metadata.builder()
@@ -980,7 +979,7 @@ public void testBuilderRejectsDataStreamThatConflictsWithAlias() {
980979
public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
981980
final String dataStreamName = "my-data-stream";
982981
IndexMetadata validIdx = createFirstBackingIndex(dataStreamName).build();
983-
final String conflictingIndex = dataStreamName + "-000002";
982+
final String conflictingIndex = DataStream.getBackingIndexName(dataStreamName, 2);
984983
IndexMetadata invalidIdx = createBackingIndex(dataStreamName, 2).build();
985984
Metadata.Builder b = Metadata.builder()
986985
.put(validIdx, false)
@@ -994,7 +993,7 @@ public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
994993

995994
public void testBuilderRejectsDataStreamWithConflictingBackingAlias() {
996995
final String dataStreamName = "my-data-stream";
997-
final String conflictingName = dataStreamName + "-000002";
996+
final String conflictingName = DataStream.getBackingIndexName(dataStreamName, 2);
998997
IndexMetadata idx = createFirstBackingIndex(dataStreamName)
999998
.putAlias(new AliasMetadata.Builder(conflictingName))
1000999
.build();
@@ -1011,20 +1010,20 @@ public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
10111010
final String dataStreamName = "my-data-stream";
10121011
final List<Index> backingIndices = new ArrayList<>();
10131012
final int numBackingIndices = randomIntBetween(2, 5);
1014-
int lastBackingIndexNum = randomIntBetween(9, 50);
1013+
int lastBackingIndexNum = 0;
10151014
Metadata.Builder b = Metadata.builder();
10161015
for (int k = 1; k <= numBackingIndices; k++) {
1017-
IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum))
1016+
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
1017+
IndexMetadata im = IndexMetadata.builder(DataStream.getBackingIndexName(dataStreamName, lastBackingIndexNum))
10181018
.settings(settings(Version.CURRENT))
10191019
.numberOfShards(1)
10201020
.numberOfReplicas(1)
10211021
.build();
10221022
b.put(im, false);
10231023
backingIndices.add(im.getIndex());
1024-
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
10251024
}
10261025

1027-
b.put(new DataStream(dataStreamName, "ts", backingIndices));
1026+
b.put(new DataStream(dataStreamName, "ts", backingIndices, lastBackingIndexNum));
10281027
Metadata metadata = b.build();
10291028
assertThat(metadata.dataStreams().size(), equalTo(1));
10301029
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@@ -1042,7 +1041,7 @@ public void testBuildIndicesLookupForDataStreams() {
10421041
indices.add(idx.getIndex());
10431042
b.put(idx, true);
10441043
}
1045-
b.put(new DataStream(name, "ts", indices));
1044+
b.put(new DataStream(name, "ts", indices, indices.size()));
10461045
}
10471046

10481047
Metadata metadata = b.build();

0 commit comments

Comments
 (0)