Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -72,6 +77,15 @@ protected void masterOperation(
ClusterState state,
ActionListener<GetDataStreamAction.Response> listener
) throws Exception {
listener.onResponse(innerOperation(state, request, indexNameExpressionResolver, systemIndices));
}

static GetDataStreamAction.Response innerOperation(
ClusterState state,
GetDataStreamAction.Request request,
IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices
) {
List<DataStream> dataStreams = getDataStreams(state, indexNameExpressionResolver, request);
List<GetDataStreamAction.Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
for (DataStream dataStream : dataStreams) {
Expand Down Expand Up @@ -105,11 +119,53 @@ protected void masterOperation(
state,
dataStream.getIndices().stream().map(Index::getName).toArray(String[]::new)
);

GetDataStreamAction.Response.TimeSeries timeSeries = null;
if (dataStream.getIndexMode() == IndexMode.TIME_SERIES) {
List<Tuple<Instant, Instant>> ranges = new ArrayList<>();
Tuple<Instant, Instant> current = null;
for (Index index : dataStream.getIndices()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you fail if the indices aren't in time series order? Or warn or something? Just out of paranoia.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or sort them by start time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indices are in order in which they are created. (new data stream, rollover, rollover again -> 3 backing indices that are in order). So the start / end times should be in order.

I will add an assertion and ensure a warning log gets printed (if this ever should occur in production). Which I don't think should be the case.

IndexMetadata metadata = state.getMetadata().index(index);
Instant start = IndexSettings.TIME_SERIES_START_TIME.get(metadata.getSettings());
Instant end = IndexSettings.TIME_SERIES_END_TIME.get(metadata.getSettings());
if (current == null) {
current = new Tuple<>(start, end);
} else if (current.v2().compareTo(start) == 0) {
current = new Tuple<>(current.v1(), end);
} else if (current.v2().compareTo(start) < 0) {
ranges.add(current);
current = new Tuple<>(start, end);
} else {
String message = "previous backing index ["
+ current.v1()
+ "/"
+ current.v2()
+ "] range is colliding with current backing index range ["
+ start
+ "/"
+ end
+ "]";
assert current.v2().compareTo(start) < 0 : message;
LOGGER.warn(message);
}
}
if (current != null) {
ranges.add(current);
}
timeSeries = new GetDataStreamAction.Response.TimeSeries(ranges);
}

dataStreamInfos.add(
new GetDataStreamAction.Response.DataStreamInfo(dataStream, streamHealth.getStatus(), indexTemplate, ilmPolicyName)
new GetDataStreamAction.Response.DataStreamInfo(
dataStream,
streamHealth.getStatus(),
indexTemplate,
ilmPolicyName,
timeSeries
)
);
}
listener.onResponse(new GetDataStreamAction.Response(dataStreamInfos));
return new GetDataStreamAction.Response(dataStreamInfos);
}

static List<DataStream> getDataStreams(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -28,12 +30,22 @@ protected Response createTestInstance() {
int numDataStreams = randomIntBetween(0, 8);
List<Response.DataStreamInfo> dataStreams = new ArrayList<>();
for (int i = 0; i < numDataStreams; i++) {
List<Tuple<Instant, Instant>> timeSeries = null;
if (randomBoolean()) {
timeSeries = new ArrayList<>();
int numTimeSeries = randomIntBetween(0, 3);
for (int j = 0; j < numTimeSeries; j++) {
timeSeries.add(new Tuple<>(Instant.now(), Instant.now()));
}
}

dataStreams.add(
new Response.DataStreamInfo(
DataStreamTestHelper.randomInstance(),
ClusterHealthStatus.GREEN,
randomAlphaOfLengthBetween(2, 10),
randomAlphaOfLengthBetween(2, 10)
randomAlphaOfLengthBetween(2, 10),
timeSeries != null ? new Response.TimeSeries(timeSeries) : null
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,30 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.test.ESTestCase;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.getClusterStateWithDataStreams;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class GetDataStreamsTransportActionTests extends ESTestCase {

private final IndexNameExpressionResolver resolver = TestIndexNameExpressionResolver.newInstance();
private final SystemIndices systemIndices = new SystemIndices(Map.of());

public void testGetDataStream() {
final String dataStreamName = "my-data-stream";
Expand Down Expand Up @@ -107,4 +116,64 @@ public void testGetNonexistentDataStream() {
assertThat(e.getMessage(), containsString("no such index [" + dataStreamName + "]"));
}

public void testGetTimeSeriesDataStream() {
Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS);
String dataStream1 = "ds-1";
String dataStream2 = "ds-2";
Instant sixHoursAgo = now.minus(6, ChronoUnit.HOURS);
Instant fourHoursAgo = now.minus(4, ChronoUnit.HOURS);
Instant twoHoursAgo = now.minus(2, ChronoUnit.HOURS);
Instant twoHoursAhead = now.plus(2, ChronoUnit.HOURS);

ClusterState state;
{
var mBuilder = new Metadata.Builder();
DataStreamTestHelper.getClusterStateWithDataStream(
mBuilder,
dataStream1,
List.of(
new Tuple<>(sixHoursAgo, fourHoursAgo),
new Tuple<>(fourHoursAgo, twoHoursAgo),
new Tuple<>(twoHoursAgo, twoHoursAhead)
)
);
DataStreamTestHelper.getClusterStateWithDataStream(
mBuilder,
dataStream2,
List.of(
new Tuple<>(sixHoursAgo, fourHoursAgo),
new Tuple<>(fourHoursAgo, twoHoursAgo),
new Tuple<>(twoHoursAgo, twoHoursAhead)
)
);
state = ClusterState.builder(new ClusterName("_name")).metadata(mBuilder).build();
}

var req = new GetDataStreamAction.Request(new String[] {});
var response = GetDataStreamsTransportAction.innerOperation(state, req, resolver, systemIndices);
assertThat(response.getDataStreams(), hasSize(2));
assertThat(response.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStream1));
assertThat(response.getDataStreams().get(0).getTimeSeries().temporalRanges(), contains(new Tuple<>(sixHoursAgo, twoHoursAhead)));
assertThat(response.getDataStreams().get(1).getDataStream().getName(), equalTo(dataStream2));
assertThat(response.getDataStreams().get(1).getTimeSeries().temporalRanges(), contains(new Tuple<>(sixHoursAgo, twoHoursAhead)));

// Remove the middle backing index first data stream, so that there is time gap in the data stream:
{
Metadata.Builder mBuilder = Metadata.builder(state.getMetadata());
DataStream dataStream = state.getMetadata().dataStreams().get(dataStream1);
mBuilder.put(dataStream.removeBackingIndex(dataStream.getIndices().get(1)));
mBuilder.remove(dataStream.getIndices().get(1).getName());
state = ClusterState.builder(state).metadata(mBuilder).build();
}
response = GetDataStreamsTransportAction.innerOperation(state, req, resolver, systemIndices);
assertThat(response.getDataStreams(), hasSize(2));
assertThat(response.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStream1));
assertThat(
response.getDataStreams().get(0).getTimeSeries().temporalRanges(),
contains(new Tuple<>(sixHoursAgo, fourHoursAgo), new Tuple<>(twoHoursAgo, twoHoursAhead))
);
assertThat(response.getDataStreams().get(1).getDataStream().getName(), equalTo(dataStream2));
assertThat(response.getDataStreams().get(1).getTimeSeries().temporalRanges(), contains(new Tuple<>(sixHoursAgo, twoHoursAhead)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ created the data stream:
- match: { data_streams.0.template: 'my-template1' }
- match: { data_streams.0.hidden: false }
- match: { data_streams.0.system: false }
- match: { data_streams.0.time_series.temporal_ranges.0.start: 2021-04-28T00:00:00.000Z }
- match: { data_streams.0.time_series.temporal_ranges.0.end: 2021-04-29T00:00:00.000Z }
- set: { data_streams.0.indices.0.index_name: backing_index }

- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.action.datastreams;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
Expand All @@ -18,12 +19,16 @@
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -122,28 +127,42 @@ public static class DataStreamInfo implements SimpleDiffable<DataStreamInfo>, To
public static final ParseField SYSTEM_FIELD = new ParseField("system");
public static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
public static final ParseField REPLICATED = new ParseField("replicated");
public static final ParseField TIME_SERIES = new ParseField("time_series");
public static final ParseField TEMPORAL_RANGES = new ParseField("temporal_ranges");
public static final ParseField TEMPORAL_RANGE_START = new ParseField("start");
public static final ParseField TEMPORAL_RANGE_END = new ParseField("end");

DataStream dataStream;
ClusterHealthStatus dataStreamStatus;
private final DataStream dataStream;
private final ClusterHealthStatus dataStreamStatus;
@Nullable
String indexTemplate;
private final String indexTemplate;
@Nullable
String ilmPolicyName;
private final String ilmPolicyName;
@Nullable
private final TimeSeries timeSeries;

public DataStreamInfo(
DataStream dataStream,
ClusterHealthStatus dataStreamStatus,
@Nullable String indexTemplate,
@Nullable String ilmPolicyName
@Nullable String ilmPolicyName,
@Nullable TimeSeries timeSeries
) {
this.dataStream = dataStream;
this.dataStreamStatus = dataStreamStatus;
this.indexTemplate = indexTemplate;
this.ilmPolicyName = ilmPolicyName;
this.timeSeries = timeSeries;
}

public DataStreamInfo(StreamInput in) throws IOException {
this(new DataStream(in), ClusterHealthStatus.readFrom(in), in.readOptionalString(), in.readOptionalString());
DataStreamInfo(StreamInput in) throws IOException {
this(
new DataStream(in),
ClusterHealthStatus.readFrom(in),
in.readOptionalString(),
in.readOptionalString(),
in.getVersion().onOrAfter(Version.V_8_3_0) ? in.readOptionalWriteable(TimeSeries::new) : null
);
}

public DataStream getDataStream() {
Expand All @@ -164,12 +183,20 @@ public String getIlmPolicy() {
return ilmPolicyName;
}

@Nullable
public TimeSeries getTimeSeries() {
return timeSeries;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
dataStream.writeTo(out);
dataStreamStatus.writeTo(out);
out.writeOptionalString(indexTemplate);
out.writeOptionalString(ilmPolicyName);
if (out.getVersion().onOrAfter(Version.V_8_3_0)) {
out.writeOptionalWriteable(timeSeries);
}
}

@Override
Expand All @@ -193,6 +220,20 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(SYSTEM_FIELD.getPreferredName(), dataStream.isSystem());
builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), dataStream.isAllowCustomRouting());
builder.field(REPLICATED.getPreferredName(), dataStream.isReplicated());
if (timeSeries != null) {
builder.startObject(TIME_SERIES.getPreferredName());
builder.startArray(TEMPORAL_RANGES.getPreferredName());
for (var range : timeSeries.temporalRanges()) {
builder.startObject();
Instant start = range.v1();
builder.field(TEMPORAL_RANGE_START.getPreferredName(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(start));
Instant end = range.v2();
builder.field(TEMPORAL_RANGE_END.getPreferredName(), DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format(end));
builder.endObject();
}
builder.endArray();
builder.endObject();
}
builder.endObject();
return builder;
}
Expand All @@ -205,12 +246,41 @@ public boolean equals(Object o) {
return dataStream.equals(that.dataStream)
&& dataStreamStatus == that.dataStreamStatus
&& Objects.equals(indexTemplate, that.indexTemplate)
&& Objects.equals(ilmPolicyName, that.ilmPolicyName);
&& Objects.equals(ilmPolicyName, that.ilmPolicyName)
&& Objects.equals(timeSeries, that.timeSeries);
}

@Override
public int hashCode() {
return Objects.hash(dataStream, dataStreamStatus, indexTemplate, ilmPolicyName, timeSeries);
}
}

public static record TimeSeries(List<Tuple<Instant, Instant>> temporalRanges) implements Writeable {

TimeSeries(StreamInput in) throws IOException {
this(in.readList(in1 -> new Tuple<>(in1.readInstant(), in1.readInstant())));
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(temporalRanges, (out1, value) -> {
out1.writeInstant(value.v1());
out1.writeInstant(value.v2());
});
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TimeSeries that = (TimeSeries) o;
return temporalRanges.equals(that.temporalRanges);
}

@Override
public int hashCode() {
return Objects.hash(dataStream, dataStreamStatus, indexTemplate, ilmPolicyName);
return Objects.hash(temporalRanges);
}
}

Expand Down