diff --git a/server/src/main/resources/transport/definitions/referable/time_series_aggregate_timestamp.csv b/server/src/main/resources/transport/definitions/referable/time_series_aggregate_timestamp.csv new file mode 100644 index 0000000000000..017c60f0bb209 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/time_series_aggregate_timestamp.csv @@ -0,0 +1 @@ +9257000 diff --git a/server/src/main/resources/transport/upper_bounds/9.4.csv b/server/src/main/resources/transport/upper_bounds/9.4.csv index 794078470a587..1239b5fb3bbda 100644 --- a/server/src/main/resources/transport/upper_bounds/9.4.csv +++ b/server/src/main/resources/transport/upper_bounds/9.4.csv @@ -1 +1 @@ -jina_ai_embedding_refactor,9256000 +time_series_aggregate_timestamp,9257000 diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java index 2b4b1ed4f104c..38b5891d8dd24 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java @@ -82,7 +82,7 @@ public abstract class GenerativeRestTest extends ESRestTestCase implements Query "implicit time-series aggregation function .* doesn't support type .*", "INLINE STATS .* can only be used after STATS when used with TS command", "cannot group by a metric field .* in a time-series aggregation", - "a @timestamp field of type date or date_nanos to be present when run with the TS command, but it was not present", + "a @timestamp field of type date or date_nanos to be present", "Output has changed from \\[.*\\] to \\[.*\\]" // https://github.com/elastic/elasticsearch/issues/134794 ); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index 058c9d54a0470..227407fc61372 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -751,18 +751,6 @@ maxRate:double | tbucket:datetime 0.05898 | 2024-05-10T00:00:00.000Z ; -Rename timestamp when aggs don't require timestamp -required_capability: ts_command_v0 - -TS k8s -| RENAME @timestamp AS newTs -| STATS mx = max(max_over_time(network.eth0.tx)) BY tbucket = bucket(newTs, 1hour) -; - -mx:integer | tbucket:datetime -1716 | 2024-05-10T00:00:00.000Z -; - instant_selector_dimensions_promql required_capability: promql_pre_tech_preview_v12 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 09b72027239b4..aad167bec9a01 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -533,18 +533,19 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) { }; } - private Aggregate resolveAggregate(Aggregate aggregate, List childrenOutput) { + private LogicalPlan resolveAggregate(Aggregate aggregate, List childrenOutput) { // if the grouping is resolved but the aggs are not, use the former to resolve the latter // e.g. STATS a ... GROUP BY a = x + 1 Holder changed = new Holder<>(false); List groupings = aggregate.groupings(); List aggregates = aggregate.aggregates(); + Function resolve = ua -> maybeResolveAttribute(ua, childrenOutput); // first resolve groupings since the aggs might refer to them // trying to globally resolve unresolved attributes will lead to some being marked as unresolvable if (Resolvables.resolved(groupings) == false) { List newGroupings = new ArrayList<>(groupings.size()); for (Expression g : groupings) { - Expression resolved = g.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput)); + Expression resolved = g.transformUp(UnresolvedAttribute.class, resolve); if (resolved != g) { changed.set(true); } @@ -596,8 +597,11 @@ private Aggregate resolveAggregate(Aggregate aggregate, List children // TODO: remove this when Stats interface is removed aggregate = changed.get() ? aggregate.with(aggregate.child(), groupings, newAggregates) : aggregate; } - - return aggregate; + if (aggregate instanceof TimeSeriesAggregate ts && ts.timestamp() instanceof UnresolvedAttribute unresolvedTimestamp) { + return ts.withTimestamp(maybeResolveAttribute(unresolvedTimestamp, childrenOutput)); + } else { + return aggregate; + } } private LogicalPlan resolveCompletion(Completion p, List childrenOutput) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/TimeSeriesGroupByAll.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/TimeSeriesGroupByAll.java index 9991f739e40b9..8323b7a7b5ac9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/TimeSeriesGroupByAll.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/TimeSeriesGroupByAll.java @@ -98,7 +98,8 @@ public LogicalPlan rule(TimeSeriesAggregate aggregate) { aggregate.child(), groupings, newAggregateFunctions, - null + null, + aggregate.timestamp() ); // insert the time_series return newStats.transformDown(EsRelation.class, r -> r.withAdditionalAttribute(timeSeries)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java index 6a1793a763722..c156982a3ec95 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/TranslateTimeSeriesAggregate.java @@ -17,11 +17,9 @@ import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; -import org.elasticsearch.xpack.esql.core.tree.Node; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.util.CollectionUtils; import org.elasticsearch.xpack.esql.core.util.Holder; -import org.elasticsearch.xpack.esql.expression.function.TimestampAware; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.DimensionValues; import org.elasticsearch.xpack.esql.expression.function.aggregate.HistogramMergeOverTime; @@ -167,23 +165,16 @@ public TranslateTimeSeriesAggregate() { @Override protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContext context) { Holder tsid = new Holder<>(); - Holder timestamp = new Holder<>(); aggregate.forEachDown(EsRelation.class, r -> { for (Attribute attr : r.output()) { if (attr.name().equals(MetadataAttribute.TSID_FIELD)) { tsid.set(attr); } - if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) { - timestamp.set(attr); - } } }); if (tsid.get() == null) { tsid.set(new MetadataAttribute(aggregate.source(), MetadataAttribute.TSID_FIELD, DataType.TSID_DATA_TYPE, false)); } - if (timestamp.get() == null) { - throw new IllegalArgumentException("@timestamp field is missing from the time-series source"); - } Map timeSeriesAggs = new HashMap<>(); List firstPassAggs = new ArrayList<>(); List secondPassAggs = new ArrayList<>(); @@ -230,7 +221,7 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex if (aggField.dataType() == DataType.EXPONENTIAL_HISTOGRAM || aggField.dataType() == DataType.TDIGEST) { tsAgg = new HistogramMergeOverTime(af.source(), aggField, Literal.TRUE, af.window()); } else { - tsAgg = new LastOverTime(af.source(), aggField, af.window(), timestamp.get()); + tsAgg = new LastOverTime(af.source(), aggField, af.window(), aggregate.timestamp()); } final AggregateFunction firstStageFn; if (inlineFilter != null) { @@ -257,28 +248,6 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex firstPassAggs.add(agg); } } - if (aggregate.child().output().contains(timestamp.get()) == false) { - var timestampAwareFunctions = timeSeriesAggs.keySet() - .stream() - .filter(ts -> ts instanceof TimestampAware) - .map(Node::sourceText) - .sorted() - .toList(); - if (timestampAwareFunctions.isEmpty() == false) { - int size = timestampAwareFunctions.size(); - throw new IllegalArgumentException( - "Function" - + (size > 1 ? "s " : " ") - + "[" - + String.join(", ", timestampAwareFunctions.subList(0, Math.min(size, 3))) - + (size > 3 ? ", ..." : "") - + "] require" - + (size > 1 ? " " : "s ") - + "a @timestamp field of type date or date_nanos to be present when run with the TS command, " - + "but it was not present." - ); - } - } // time-series aggregates must be grouped by _tsid (and time-bucket) first and re-group by users key List firstPassGroupings = new ArrayList<>(); firstPassGroupings.add(tsid.get()); @@ -288,18 +257,18 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex Holder timeBucketRef = new Holder<>(); aggregate.child().forEachExpressionUp(NamedExpression.class, e -> { for (Expression child : e.children()) { - if (child instanceof Bucket bucket && bucket.field().equals(timestamp.get())) { + if (child instanceof Bucket bucket && bucket.field().equals(aggregate.timestamp())) { if (timeBucketRef.get() != null) { throw new IllegalArgumentException("expected at most one time bucket"); } timeBucketRef.set(e); - } else if (child instanceof TBucket tbucket && tbucket.timestamp().equals(timestamp.get())) { + } else if (child instanceof TBucket tbucket && tbucket.timestamp().equals(aggregate.timestamp())) { if (timeBucketRef.get() != null) { throw new IllegalArgumentException("expected at most one time tbucket"); } Bucket bucket = (Bucket) tbucket.surrogate(); timeBucketRef.set(new Alias(e.source(), bucket.functionName(), bucket, e.id())); - } else if (child instanceof DateTrunc dateTrunc && dateTrunc.field().equals(timestamp.get())) { + } else if (child instanceof DateTrunc dateTrunc && dateTrunc.field().equals(aggregate.timestamp())) { if (timeBucketRef.get() != null) { throw new IllegalArgumentException("expected at most one time bucket"); } @@ -365,7 +334,8 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex newChild, firstPassGroupings, mergeExpressions(firstPassAggs, firstPassGroupings), - (Bucket) Alias.unwrap(timeBucket) + (Bucket) Alias.unwrap(timeBucket), + aggregate.timestamp() ); checkWindow(firstPhase); if (packDimensions.isEmpty()) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/promql/TranslatePromqlToTimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/promql/TranslatePromqlToTimeSeriesAggregate.java index 7050e1a4fd85d..f32dfd5525a88 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/promql/TranslatePromqlToTimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/promql/TranslatePromqlToTimeSeriesAggregate.java @@ -179,7 +179,7 @@ private static TimeSeriesAggregate createTimeSeriesAggregate(PromqlCommand promq groupings.add(grouping); } plan = new Eval(stepBucket.source(), plan, List.of(stepBucket)); - return new TimeSeriesAggregate(promqlCommand.promqlPlan().source(), plan, groupings, aggs, null); + return new TimeSeriesAggregate(promqlCommand.promqlPlan().source(), plan, groupings, aggs, null, promqlCommand.timestamp()); } private static FieldAttribute getTimeSeriesGrouping(List groupings) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index d6650924a4625..cc191d43bf9e0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -471,7 +471,14 @@ public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) { return input -> { if (input.anyMatch(p -> p instanceof Aggregate) == false && input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) { - return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null); + return new TimeSeriesAggregate( + source(ctx), + input, + stats.groupings, + stats.aggregates, + null, + new UnresolvedTimestamp(source(ctx)) + ); } else { return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java index 2fb8c8e7ffc96..a9322e87fb738 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregate.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.esql.plan.logical; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -16,9 +17,11 @@ import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.expression.TypedAttribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.function.TimestampAware; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.HistogramMergeOverTime; @@ -37,35 +40,49 @@ * An extension of {@link Aggregate} to perform time-series aggregation per time-series, such as rate or _over_time. * The grouping must be `_tsid` and `tbucket` or just `_tsid`. */ -public class TimeSeriesAggregate extends Aggregate { +public class TimeSeriesAggregate extends Aggregate implements TimestampAware { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( LogicalPlan.class, "TimeSeriesAggregate", TimeSeriesAggregate::new ); + private static final TransportVersion TIME_SERIES_AGGREGATE_TIMESTAMP = TransportVersion.fromName("time_series_aggregate_timestamp"); private final Bucket timeBucket; + private final Expression timestamp; public TimeSeriesAggregate( Source source, LogicalPlan child, List groupings, List aggregates, - Bucket timeBucket + Bucket timeBucket, + Expression timestamp ) { super(source, child, groupings, aggregates); this.timeBucket = timeBucket; + this.timestamp = timestamp; } public TimeSeriesAggregate(StreamInput in) throws IOException { super(in); this.timeBucket = in.readOptionalWriteable(inp -> (Bucket) Bucket.ENTRY.reader.read(inp)); + if (in.getTransportVersion().supports(TIME_SERIES_AGGREGATE_TIMESTAMP)) { + this.timestamp = in.readOptionalNamedWriteable(Expression.class); + } else { + // We only need the timestamp during analysis and logical optimization on the coordinator. + // Using null (when deserialized from an old node) in this case should be okay. + this.timestamp = null; + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeOptionalWriteable(timeBucket); + if (out.getTransportVersion().supports(TIME_SERIES_AGGREGATE_TIMESTAMP)) { + out.writeOptionalNamedWriteable(timestamp); + } } @Override @@ -75,22 +92,29 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket); + return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket, timestamp); } @Override public TimeSeriesAggregate replaceChild(LogicalPlan newChild) { - return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket); + return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket, timestamp); } @Override public TimeSeriesAggregate with(LogicalPlan child, List newGroupings, List newAggregates) { - return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket); + return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket, timestamp); + } + + public LogicalPlan withTimestamp(Expression newTimestamp) { + if (newTimestamp.equals(timestamp)) { + return this; + } + return new TimeSeriesAggregate(source(), child(), groupings, aggregates, timeBucket, newTimestamp); } @Override public boolean expressionsResolved() { - return super.expressionsResolved() && (timeBucket == null || timeBucket.resolved()); + return super.expressionsResolved() && (timeBucket == null || timeBucket.resolved()) && (timestamp == null || timestamp.resolved()); } @Nullable @@ -98,9 +122,14 @@ public Bucket timeBucket() { return timeBucket; } + @Override + public Expression timestamp() { + return timestamp; + } + @Override public int hashCode() { - return Objects.hash(groupings, aggregates, child(), timeBucket); + return Objects.hash(groupings, aggregates, child(), timeBucket, timestamp); } @Override @@ -117,7 +146,8 @@ public boolean equals(Object obj) { return Objects.equals(groupings, other.groupings) && Objects.equals(aggregates, other.aggregates) && Objects.equals(child(), other.child()) - && Objects.equals(timeBucket, other.timeBucket); + && Objects.equals(timeBucket, other.timeBucket) + && Objects.equals(timestamp, other.timestamp); } @Override @@ -206,6 +236,14 @@ public void postAnalysisVerification(Failures failures) { ); } }); + if ((timestamp instanceof TypedAttribute) == false || timestamp.dataType().isDate() == false) { + failures.add( + fail( + timestamp, + "the TS command requires a @timestamp field of type date or date_nanos to be present, but it was not present" + ) + ); + } } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index daf37c2449ddd..d745a9662d2c2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -3053,10 +3053,10 @@ public void testInlineStatsWithRateNotAllowed() { public void testLimitBeforeInlineStats_WithTS() { assumeTrue("LIMIT before INLINE STATS limitation check", EsqlCapabilities.Cap.FORBID_LIMIT_BEFORE_INLINE_STATS.isEnabled()); assertThat( - error("TS test | STATS m=max(languages) BY s=salary/10000 | LIMIT 5 | INLINE STATS max(s) BY m"), + error("TS k8s | STATS m=max(network.eth0.tx) BY pod, cluster | LIMIT 5 | INLINE STATS max(m) BY pod"), containsString( - "1:64: INLINE STATS cannot be used after an explicit or implicit LIMIT command, " - + "but was [INLINE STATS max(s) BY m] after [LIMIT 5] [@1:54]" + "1:67: INLINE STATS cannot be used after an explicit or implicit LIMIT command, " + + "but was [INLINE STATS max(m) BY pod] after [LIMIT 5] [@1:57]" ) ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index fd5ff6b326f48..016cd7ec1ddbf 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -9850,46 +9850,37 @@ public void testFullTextFunctionOnEvalNull() { public void testTranslateMetricsAfterRenamingTimestamp() { assertThat( expectThrows( - IllegalArgumentException.class, + VerificationException.class, () -> logicalOptimizerWithLatestVersion.optimize(metricsAnalyzer.analyze(parser.parseQuery(""" TS k8s | EVAL @timestamp = region | STATS max(network.cost), count(network.eth0.rx) """))) ).getMessage(), - containsString(""" - Functions [count(network.eth0.rx), max(network.cost)] require a @timestamp field of type date or date_nanos \ - to be present when run with the TS command, but it was not present.""") + containsString("the TS command requires a @timestamp field of type date or date_nanos to be present, but it was not present") ); assertThat( expectThrows( - IllegalArgumentException.class, + VerificationException.class, () -> logicalOptimizerWithLatestVersion.optimize(metricsAnalyzer.analyze(parser.parseQuery(""" TS k8s | DISSECT event "%{@timestamp} %{network.total_bytes_in}" | STATS ohxqxpSqEZ = avg(network.eth0.currently_connected_clients) """))) ).getMessage(), - containsString(""" - Function [avg(network.eth0.currently_connected_clients)] requires a @timestamp field of type date or date_nanos \ - to be present when run with the TS command, but it was not present.""") + containsString("the TS command requires a @timestamp field of type date or date_nanos to be present, but it was not present") ); - // we may want to allow this later - assertThat( - expectThrows( - IllegalArgumentException.class, - () -> logicalOptimizerWithLatestVersion.optimize(metricsAnalyzer.analyze(parser.parseQuery(""" - TS k8s | - EVAL `@timestamp` = @timestamp + 1day | - STATS std_dev(network.eth0.currently_connected_clients) - """))) - ).getMessage(), - containsString(""" - Function [std_dev(network.eth0.currently_connected_clients)] requires a @timestamp field of type date or date_nanos \ - to be present when run with the TS command, but it was not present.""") - ); + var plan = logicalOptimizerWithLatestVersion.optimize(metricsAnalyzer.analyze(parser.parseQuery(""" + TS k8s | + EVAL `@timestamp` = @timestamp + 1day | + STATS std_dev(network.eth0.currently_connected_clients) + """))); + var tsStats = plan.collect(TimeSeriesAggregate.class).getFirst(); + assertThat(tsStats.timestamp().dataType(), equalTo(DataType.DATETIME)); + LastOverTime lastOverTime = as(Alias.unwrap(tsStats.aggregates().getFirst()), LastOverTime.class); + assertThat(lastOverTime.timestamp(), equalTo(tsStats.timestamp())); } /** diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java index 483183fb6a0d5..9c5fd6b4179b9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java @@ -29,7 +29,10 @@ import org.elasticsearch.xpack.esql.core.expression.MapExpression; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; +import org.elasticsearch.xpack.esql.core.expression.UnresolvedTimestamp; import org.elasticsearch.xpack.esql.core.expression.predicate.operator.comparison.BinaryComparison; +import org.elasticsearch.xpack.esql.core.tree.Location; +import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.expression.UnresolvedNamePattern; @@ -2543,7 +2546,8 @@ public void testSimpleMetricsWithStats() { new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))), attribute("ts") ), - null + null, + new UnresolvedTimestamp(new Source(Location.EMPTY, "STATS load=avg(cpu) BY ts")) ) ); assertQuery( @@ -2556,7 +2560,8 @@ public void testSimpleMetricsWithStats() { new Alias(EMPTY, "load", new UnresolvedFunction(EMPTY, "avg", DEFAULT, List.of(attribute("cpu")))), attribute("ts") ), - null + null, + new UnresolvedTimestamp(new Source(Location.EMPTY, "STATS load=avg(cpu) BY ts")) ) ); assertQuery( @@ -2579,7 +2584,8 @@ public void testSimpleMetricsWithStats() { ), attribute("ts") ), - null + null, + new UnresolvedTimestamp(new Source(Location.EMPTY, "STATS load=avg(cpu),max(rate(requests)) BY ts")) ) ); assertQuery( @@ -2589,7 +2595,8 @@ public void testSimpleMetricsWithStats() { unresolvedTSRelation("foo*"), List.of(), List.of(new Alias(EMPTY, "count(errors)", new UnresolvedFunction(EMPTY, "count", DEFAULT, List.of(attribute("errors"))))), - null + null, + new UnresolvedTimestamp(new Source(Location.EMPTY, "STATS count(errors)")) ) ); assertQuery( @@ -2599,7 +2606,8 @@ public void testSimpleMetricsWithStats() { unresolvedTSRelation("foo*"), List.of(), List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))), - null + null, + new UnresolvedTimestamp(new Source(Location.EMPTY, "STATS a(b)")) ) ); assertQuery( @@ -2609,7 +2617,8 @@ public void testSimpleMetricsWithStats() { unresolvedTSRelation("foo*"), List.of(), List.of(new Alias(EMPTY, "a(b)", new UnresolvedFunction(EMPTY, "a", DEFAULT, List.of(attribute("b"))))), - null + null, + new UnresolvedTimestamp(new Source(Location.EMPTY, "STATS a(b)")) ) ); assertQuery( @@ -2619,7 +2628,8 @@ public void testSimpleMetricsWithStats() { unresolvedTSRelation("foo*"), List.of(), List.of(new Alias(EMPTY, "a1(b2)", new UnresolvedFunction(EMPTY, "a1", DEFAULT, List.of(attribute("b2"))))), - null + null, + new UnresolvedTimestamp(new Source(Location.EMPTY, "STATS a1(b2)")) ) ); assertQuery( @@ -2633,7 +2643,8 @@ public void testSimpleMetricsWithStats() { attribute("c"), attribute("d.e") ), - null + null, + new UnresolvedTimestamp(new Source(Location.EMPTY, "STATS b = min(a) by c, d.e")) ) ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregateSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregateSerializationTests.java index c1de30ac71234..a9129538ecd2d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregateSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/TimeSeriesAggregateSerializationTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests; import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket; import org.elasticsearch.xpack.esql.expression.function.grouping.BucketSerializationTests; @@ -24,7 +25,14 @@ protected TimeSeriesAggregate createTestInstance() { List groupings = randomFieldAttributes(0, 5, false).stream().map(a -> (Expression) a).toList(); List aggregates = AggregateSerializationTests.randomAggregates(); Bucket timeBucket = BucketSerializationTests.createRandomBucket(configuration()); - return new TimeSeriesAggregate(source, child, groupings, aggregates, timeBucket); + return new TimeSeriesAggregate( + source, + child, + groupings, + aggregates, + timeBucket, + AbstractExpressionSerializationTests.randomChild() + ); } @Override @@ -42,7 +50,7 @@ protected TimeSeriesAggregate mutateInstance(TimeSeriesAggregate instance) throw case 2 -> aggregates = randomValueOtherThan(aggregates, AggregateSerializationTests::randomAggregates); case 3 -> timeBucket = randomValueOtherThan(timeBucket, () -> BucketSerializationTests.createRandomBucket(configuration())); } - return new TimeSeriesAggregate(instance.source(), child, groupings, aggregates, timeBucket); + return new TimeSeriesAggregate(instance.source(), child, groupings, aggregates, timeBucket, instance.timestamp()); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/VerifierMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/VerifierMetricsTests.java index 3412332657b09..701c6c0e2cbed 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/VerifierMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/telemetry/VerifierMetricsTests.java @@ -724,8 +724,8 @@ public void testBinaryPlanAfterInlineStats() { public void testTimeSeriesAggregate() { assumeTrue("TS required", EsqlCapabilities.Cap.TS_COMMAND_V0.isEnabled()); Counters c = esql(""" - TS metrics - | STATS sum(avg_over_time(salary))"""); + TS k8s + | STATS sum(avg_over_time(network.cost))"""); assertEquals(0, dissect(c)); assertEquals(0, eval(c)); assertEquals(0, grok(c)); @@ -813,11 +813,10 @@ public void testBinaryPlanAfterSubqueryInFromCommand() { assertEquals(1L, function("min", c)); } - @AwaitsFix(bugUrl = "unresolved @timestamp field") public void testPromql() { assumeTrue("PromQL required", PromqlFeatures.isEnabled()); Counters c = esql(""" - PROMQL metrics step 5m (sum(salary))"""); + PROMQL index=k8s step=5m sum(network.cost)"""); assertEquals(0, dissect(c)); assertEquals(0, eval(c)); assertEquals(0, grok(c)); @@ -954,8 +953,9 @@ private Counters esql(String esql, Verifier v) { verifier = new Verifier(metrics, new XPackLicenseState(() -> 0L)); } IndexResolution metricsIndex = loadMapping("mapping-basic.json", "metrics", IndexMode.TIME_SERIES); + IndexResolution k8sIndex = loadMapping("k8s-mappings.json", "k8s", IndexMode.TIME_SERIES); IndexResolution employees = loadMapping("mapping-basic.json", "employees"); - analyzer(indexResolutions(metricsIndex, employees), verifier).analyze(EsqlParser.INSTANCE.parseQuery(esql)); + analyzer(indexResolutions(metricsIndex, k8sIndex, employees), verifier).analyze(EsqlParser.INSTANCE.parseQuery(esql)); return metrics == null ? null : metrics.stats(); }