Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9257000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.4.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
jina_ai_embedding_refactor,9256000
time_series_aggregate_timestamp,9257000
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public abstract class GenerativeRestTest extends ESRestTestCase implements Query
"implicit time-series aggregation function .* doesn't support type .*",
"INLINE STATS .* can only be used after STATS when used with TS command",
"cannot group by a metric field .* in a time-series aggregation",
"a @timestamp field of type date or date_nanos to be present when run with the TS command, but it was not present",
"a @timestamp field of type date or date_nanos to be present",
"Output has changed from \\[.*\\] to \\[.*\\]" // https://github.com/elastic/elasticsearch/issues/134794
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,18 +751,6 @@ maxRate:double | tbucket:datetime
0.05898 | 2024-05-10T00:00:00.000Z
;

Rename timestamp when aggs don't require timestamp
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewer: this now throws an error as the TS|STATS command itself requires the timestamp, even though max and max_over_time don't require the timestamp. I think the new behavior makes more sense and is more predictable.

required_capability: ts_command_v0

TS k8s
| RENAME @timestamp AS newTs
| STATS mx = max(max_over_time(network.eth0.tx)) BY tbucket = bucket(newTs, 1hour)
;

mx:integer | tbucket:datetime
1716 | 2024-05-10T00:00:00.000Z
;

instant_selector_dimensions_promql
required_capability: promql_pre_tech_preview_v12

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,18 +533,19 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
};
}

private Aggregate resolveAggregate(Aggregate aggregate, List<Attribute> childrenOutput) {
private LogicalPlan resolveAggregate(Aggregate aggregate, List<Attribute> childrenOutput) {
// if the grouping is resolved but the aggs are not, use the former to resolve the latter
// e.g. STATS a ... GROUP BY a = x + 1
Holder<Boolean> changed = new Holder<>(false);
List<Expression> groupings = aggregate.groupings();
List<? extends NamedExpression> aggregates = aggregate.aggregates();
Function<UnresolvedAttribute, Expression> resolve = ua -> maybeResolveAttribute(ua, childrenOutput);
// first resolve groupings since the aggs might refer to them
// trying to globally resolve unresolved attributes will lead to some being marked as unresolvable
if (Resolvables.resolved(groupings) == false) {
List<Expression> newGroupings = new ArrayList<>(groupings.size());
for (Expression g : groupings) {
Expression resolved = g.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
Expression resolved = g.transformUp(UnresolvedAttribute.class, resolve);
if (resolved != g) {
changed.set(true);
}
Expand Down Expand Up @@ -596,8 +597,11 @@ private Aggregate resolveAggregate(Aggregate aggregate, List<Attribute> children
// TODO: remove this when Stats interface is removed
aggregate = changed.get() ? aggregate.with(aggregate.child(), groupings, newAggregates) : aggregate;
}

return aggregate;
if (aggregate instanceof TimeSeriesAggregate ts && ts.timestamp() instanceof UnresolvedAttribute unresolvedTimestamp) {
return ts.withTimestamp(maybeResolveAttribute(unresolvedTimestamp, childrenOutput));
} else {
return aggregate;
}
}

private LogicalPlan resolveCompletion(Completion p, List<Attribute> childrenOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public LogicalPlan rule(TimeSeriesAggregate aggregate) {
aggregate.child(),
groupings,
newAggregateFunctions,
null
null,
aggregate.timestamp()
);
// insert the time_series
return newStats.transformDown(EsRelation.class, r -> r.withAdditionalAttribute(timeSeries));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Node;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.function.TimestampAware;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.DimensionValues;
import org.elasticsearch.xpack.esql.expression.function.aggregate.HistogramMergeOverTime;
Expand Down Expand Up @@ -167,23 +165,16 @@ public TranslateTimeSeriesAggregate() {
@Override
protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContext context) {
Holder<Attribute> tsid = new Holder<>();
Holder<Attribute> timestamp = new Holder<>();
aggregate.forEachDown(EsRelation.class, r -> {
for (Attribute attr : r.output()) {
if (attr.name().equals(MetadataAttribute.TSID_FIELD)) {
tsid.set(attr);
}
if (attr.name().equals(MetadataAttribute.TIMESTAMP_FIELD)) {
timestamp.set(attr);
}
}
});
if (tsid.get() == null) {
tsid.set(new MetadataAttribute(aggregate.source(), MetadataAttribute.TSID_FIELD, DataType.TSID_DATA_TYPE, false));
}
if (timestamp.get() == null) {
throw new IllegalArgumentException("@timestamp field is missing from the time-series source");
}
Map<AggregateFunction, Alias> timeSeriesAggs = new HashMap<>();
List<NamedExpression> firstPassAggs = new ArrayList<>();
List<NamedExpression> secondPassAggs = new ArrayList<>();
Expand Down Expand Up @@ -230,7 +221,7 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
if (aggField.dataType() == DataType.EXPONENTIAL_HISTOGRAM || aggField.dataType() == DataType.TDIGEST) {
tsAgg = new HistogramMergeOverTime(af.source(), aggField, Literal.TRUE, af.window());
} else {
tsAgg = new LastOverTime(af.source(), aggField, af.window(), timestamp.get());
tsAgg = new LastOverTime(af.source(), aggField, af.window(), aggregate.timestamp());
}
final AggregateFunction firstStageFn;
if (inlineFilter != null) {
Expand All @@ -257,28 +248,6 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
firstPassAggs.add(agg);
}
}
if (aggregate.child().output().contains(timestamp.get()) == false) {
var timestampAwareFunctions = timeSeriesAggs.keySet()
.stream()
.filter(ts -> ts instanceof TimestampAware)
.map(Node::sourceText)
.sorted()
.toList();
if (timestampAwareFunctions.isEmpty() == false) {
int size = timestampAwareFunctions.size();
throw new IllegalArgumentException(
"Function"
+ (size > 1 ? "s " : " ")
+ "["
+ String.join(", ", timestampAwareFunctions.subList(0, Math.min(size, 3)))
+ (size > 3 ? ", ..." : "")
+ "] require"
+ (size > 1 ? " " : "s ")
+ "a @timestamp field of type date or date_nanos to be present when run with the TS command, "
+ "but it was not present."
);
}
}
// time-series aggregates must be grouped by _tsid (and time-bucket) first and re-group by users key
List<Expression> firstPassGroupings = new ArrayList<>();
firstPassGroupings.add(tsid.get());
Expand All @@ -288,18 +257,18 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
Holder<NamedExpression> timeBucketRef = new Holder<>();
aggregate.child().forEachExpressionUp(NamedExpression.class, e -> {
for (Expression child : e.children()) {
if (child instanceof Bucket bucket && bucket.field().equals(timestamp.get())) {
if (child instanceof Bucket bucket && bucket.field().equals(aggregate.timestamp())) {
if (timeBucketRef.get() != null) {
throw new IllegalArgumentException("expected at most one time bucket");
}
timeBucketRef.set(e);
} else if (child instanceof TBucket tbucket && tbucket.timestamp().equals(timestamp.get())) {
} else if (child instanceof TBucket tbucket && tbucket.timestamp().equals(aggregate.timestamp())) {
if (timeBucketRef.get() != null) {
throw new IllegalArgumentException("expected at most one time tbucket");
}
Bucket bucket = (Bucket) tbucket.surrogate();
timeBucketRef.set(new Alias(e.source(), bucket.functionName(), bucket, e.id()));
} else if (child instanceof DateTrunc dateTrunc && dateTrunc.field().equals(timestamp.get())) {
} else if (child instanceof DateTrunc dateTrunc && dateTrunc.field().equals(aggregate.timestamp())) {
if (timeBucketRef.get() != null) {
throw new IllegalArgumentException("expected at most one time bucket");
}
Expand Down Expand Up @@ -365,7 +334,8 @@ protected LogicalPlan rule(TimeSeriesAggregate aggregate, LogicalOptimizerContex
newChild,
firstPassGroupings,
mergeExpressions(firstPassAggs, firstPassGroupings),
(Bucket) Alias.unwrap(timeBucket)
(Bucket) Alias.unwrap(timeBucket),
aggregate.timestamp()
);
checkWindow(firstPhase);
if (packDimensions.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private static TimeSeriesAggregate createTimeSeriesAggregate(PromqlCommand promq
groupings.add(grouping);
}
plan = new Eval(stepBucket.source(), plan, List.of(stepBucket));
return new TimeSeriesAggregate(promqlCommand.promqlPlan().source(), plan, groupings, aggs, null);
return new TimeSeriesAggregate(promqlCommand.promqlPlan().source(), plan, groupings, aggs, null, promqlCommand.timestamp());
}

private static FieldAttribute getTimeSeriesGrouping(List<Attribute> groupings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,14 @@ public PlanFactory visitStatsCommand(EsqlBaseParser.StatsCommandContext ctx) {
return input -> {
if (input.anyMatch(p -> p instanceof Aggregate) == false
&& input.anyMatch(p -> p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES)) {
return new TimeSeriesAggregate(source(ctx), input, stats.groupings, stats.aggregates, null);
return new TimeSeriesAggregate(
source(ctx),
input,
stats.groupings,
stats.aggregates,
null,
new UnresolvedTimestamp(source(ctx))
);
} else {
return new Aggregate(source(ctx), input, stats.groupings, stats.aggregates);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -16,9 +17,11 @@
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.TypedAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.TimestampAware;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.function.aggregate.HistogramMergeOverTime;
Expand All @@ -37,35 +40,49 @@
* An extension of {@link Aggregate} to perform time-series aggregation per time-series, such as rate or _over_time.
* The grouping must be `_tsid` and `tbucket` or just `_tsid`.
*/
public class TimeSeriesAggregate extends Aggregate {
public class TimeSeriesAggregate extends Aggregate implements TimestampAware {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
LogicalPlan.class,
"TimeSeriesAggregate",
TimeSeriesAggregate::new
);
private static final TransportVersion TIME_SERIES_AGGREGATE_TIMESTAMP = TransportVersion.fromName("time_series_aggregate_timestamp");

private final Bucket timeBucket;
private final Expression timestamp;

public TimeSeriesAggregate(
Source source,
LogicalPlan child,
List<Expression> groupings,
List<? extends NamedExpression> aggregates,
Bucket timeBucket
Bucket timeBucket,
Expression timestamp
) {
super(source, child, groupings, aggregates);
this.timeBucket = timeBucket;
this.timestamp = timestamp;
}

public TimeSeriesAggregate(StreamInput in) throws IOException {
super(in);
this.timeBucket = in.readOptionalWriteable(inp -> (Bucket) Bucket.ENTRY.reader.read(inp));
if (in.getTransportVersion().supports(TIME_SERIES_AGGREGATE_TIMESTAMP)) {
this.timestamp = in.readOptionalNamedWriteable(Expression.class);
} else {
// We only need the timestamp during analysis and logical optimization on the coordinator.
// Using null (when deserialized from an old node) in this case should be okay.
this.timestamp = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(timeBucket);
if (out.getTransportVersion().supports(TIME_SERIES_AGGREGATE_TIMESTAMP)) {
out.writeOptionalNamedWriteable(timestamp);
}
}

@Override
Expand All @@ -75,32 +92,44 @@ public String getWriteableName() {

@Override
protected NodeInfo<Aggregate> info() {
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket);
return NodeInfo.create(this, TimeSeriesAggregate::new, child(), groupings, aggregates, timeBucket, timestamp);
}

@Override
public TimeSeriesAggregate replaceChild(LogicalPlan newChild) {
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket);
return new TimeSeriesAggregate(source(), newChild, groupings, aggregates, timeBucket, timestamp);
}

@Override
public TimeSeriesAggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket);
return new TimeSeriesAggregate(source(), child, newGroupings, newAggregates, timeBucket, timestamp);
}

public LogicalPlan withTimestamp(Expression newTimestamp) {
if (newTimestamp.equals(timestamp)) {
return this;
}
return new TimeSeriesAggregate(source(), child(), groupings, aggregates, timeBucket, newTimestamp);
}

@Override
public boolean expressionsResolved() {
return super.expressionsResolved() && (timeBucket == null || timeBucket.resolved());
return super.expressionsResolved() && (timeBucket == null || timeBucket.resolved()) && (timestamp == null || timestamp.resolved());
}

@Nullable
public Bucket timeBucket() {
return timeBucket;
}

@Override
public Expression timestamp() {
return timestamp;
}

@Override
public int hashCode() {
return Objects.hash(groupings, aggregates, child(), timeBucket);
return Objects.hash(groupings, aggregates, child(), timeBucket, timestamp);
}

@Override
Expand All @@ -117,7 +146,8 @@ public boolean equals(Object obj) {
return Objects.equals(groupings, other.groupings)
&& Objects.equals(aggregates, other.aggregates)
&& Objects.equals(child(), other.child())
&& Objects.equals(timeBucket, other.timeBucket);
&& Objects.equals(timeBucket, other.timeBucket)
&& Objects.equals(timestamp, other.timestamp);
}

@Override
Expand Down Expand Up @@ -206,6 +236,14 @@ public void postAnalysisVerification(Failures failures) {
);
}
});
if ((timestamp instanceof TypedAttribute) == false || timestamp.dataType().isDate() == false) {
failures.add(
fail(
timestamp,
"the TS command requires a @timestamp field of type date or date_nanos to be present, but it was not present"
)
);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3053,10 +3053,10 @@ public void testInlineStatsWithRateNotAllowed() {
public void testLimitBeforeInlineStats_WithTS() {
assumeTrue("LIMIT before INLINE STATS limitation check", EsqlCapabilities.Cap.FORBID_LIMIT_BEFORE_INLINE_STATS.isEnabled());
assertThat(
error("TS test | STATS m=max(languages) BY s=salary/10000 | LIMIT 5 | INLINE STATS max(s) BY m"),
error("TS k8s | STATS m=max(network.eth0.tx) BY pod, cluster | LIMIT 5 | INLINE STATS max(m) BY pod"),
containsString(
"1:64: INLINE STATS cannot be used after an explicit or implicit LIMIT command, "
+ "but was [INLINE STATS max(s) BY m] after [LIMIT 5] [@1:54]"
"1:67: INLINE STATS cannot be used after an explicit or implicit LIMIT command, "
+ "but was [INLINE STATS max(m) BY pod] after [LIMIT 5] [@1:57]"
)
);
}
Expand Down
Loading