Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
e2d9da7
ESQL: INLINESTATS
nik9000 Jun 11, 2024
14017d6
Explain
nik9000 Jun 11, 2024
355905a
More nocommit
nik9000 Jun 11, 2024
a634f90
More nocommit
nik9000 Jun 11, 2024
8f04e1b
Spotless
nik9000 Jun 11, 2024
3a3939b
Merge branch 'main' into inlinestats
nik9000 Jul 5, 2024
5483426
Works again
nik9000 Jul 5, 2024
7695f67
Closer
nik9000 Jul 6, 2024
64f858b
Merge branch 'main' into inlinestats
nik9000 Jul 8, 2024
141a63f
More test
nik9000 Jul 8, 2024
d0dc736
Share
nik9000 Jul 9, 2024
6947e1c
Merge branch 'main' into inlinestats
nik9000 Jul 9, 2024
cc44421
More
nik9000 Jul 9, 2024
8466a4e
ungrouped
nik9000 Jul 9, 2024
cc20b73
WIt P
nik9000 Jul 10, 2024
2f9b8af
Merge branch 'main' into inlinestats
nik9000 Jul 10, 2024
c4f1d87
Remove
nik9000 Jul 10, 2024
1408824
Remove unused
nik9000 Jul 10, 2024
2a38bc8
Merge branch 'main' into inlinestats
nik9000 Jul 10, 2024
b786c1c
More test
nik9000 Jul 10, 2024
547e5a5
Merge
nik9000 Jul 10, 2024
0eb2958
More nocommit
nik9000 Jul 10, 2024
32a03b2
explain
nik9000 Jul 11, 2024
966c860
Merge branch 'main' into inlinestats
nik9000 Jul 12, 2024
83252cf
WIP
nik9000 Jul 13, 2024
40d3fe9
Passes now?
nik9000 Jul 15, 2024
50fc6e2
one more exampl
nik9000 Jul 16, 2024
d33a445
Merge branch 'main' into inlinestats
nik9000 Jul 16, 2024
a868e7e
Javadoc
nik9000 Jul 16, 2024
0bbdef4
MOAR JAVADOC
nik9000 Jul 16, 2024
e19c769
Update docs/changelog/109583.yaml
nik9000 Jul 16, 2024
5d019f4
Changelog
nik9000 Jul 16, 2024
434bd9b
WIP
nik9000 Jul 16, 2024
0d5d0da
Merge branch 'main' into inlinestats
nik9000 Jul 16, 2024
e7eb532
Ready?
nik9000 Jul 17, 2024
908dfc9
Update docs
nik9000 Jul 17, 2024
2386fa8
Raname to line up with other stuff
nik9000 Jul 17, 2024
887b9ce
More
nik9000 Jul 17, 2024
2e028ce
Merge branch 'main' into inlinestats
nik9000 Jul 17, 2024
1dfe527
Merge branch 'main' into inlinestats
nik9000 Jul 18, 2024
ab350c4
Apply suggestions from code review
nik9000 Jul 18, 2024
2d90569
Merge remote-tracking branch 'nik9000/inlinestats' into inlinestats
nik9000 Jul 18, 2024
c937834
Update docs
nik9000 Jul 18, 2024
0a9332c
Updates
nik9000 Jul 18, 2024
c30230c
More explain and a couple renames
nik9000 Jul 18, 2024
9d12a29
Format
nik9000 Jul 19, 2024
6f690b7
Merge branch 'main' into inlinestats
nik9000 Jul 19, 2024
18d30f0
Some progress
nik9000 Jul 19, 2024
9ff0024
percentile
nik9000 Jul 19, 2024
c142d61
Better way?
nik9000 Jul 21, 2024
76998b7
Merge branch 'main' into inlinestats
nik9000 Jul 22, 2024
40b13df
techpreview
nik9000 Jul 22, 2024
48253a4
Merge branch 'main' into inlinestats
nik9000 Jul 22, 2024
674d93d
WIP
nik9000 Jul 22, 2024
4489b27
Update
nik9000 Jul 22, 2024
8074a95
Merge branch 'main' into inlinestats
nik9000 Jul 23, 2024
10b1fcd
Link
nik9000 Jul 23, 2024
fdb43d8
Check
nik9000 Jul 23, 2024
cbb1e60
Feature flag it
nik9000 Jul 23, 2024
8fbe301
Merge branch 'main' into inlinestats
nik9000 Jul 23, 2024
09d226a
WI{
nik9000 Jul 24, 2024
0e454f7
Merge branch 'main' into inlinestats
nik9000 Jul 24, 2024
a6ec9be
more skips
nik9000 Jul 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.xpack.esql.plan.logical.EsqlAggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsqlUnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
import org.elasticsearch.xpack.esql.plan.logical.Keep;
import org.elasticsearch.xpack.esql.plan.logical.Lookup;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
Expand Down Expand Up @@ -333,6 +334,9 @@ private static class ResolveLookupTables extends ParameterizedAnalyzerRule<Looku

@Override
protected LogicalPlan rule(Lookup lookup, AnalyzerContext context) {
if (lookup.localRelation() != null) {
return lookup;
}
// the parser passes the string wrapped in a literal
Source source = lookup.source();
Expression tableNameExpression = lookup.tableName();
Expand Down Expand Up @@ -394,6 +398,10 @@ protected LogicalPlan doRule(LogicalPlan plan) {
return resolveAggregate(agg, childrenOutput);
}

if (plan instanceof InlineStats stats) {
return resolveInlineStats(stats, childrenOutput);
}

if (plan instanceof Drop d) {
return resolveDrop(d, childrenOutput);
}
Expand Down Expand Up @@ -478,6 +486,59 @@ private LogicalPlan resolveAggregate(Aggregate a, List<Attribute> childrenOutput
return a;
}

private LogicalPlan resolveInlineStats(InlineStats stats, List<Attribute> childrenOutput) {
// if the grouping is resolved but the aggs are not, use the former to resolve the latter
// e.g. STATS a ... GROUP BY a = x + 1
Holder<Boolean> changed = new Holder<>(false);
List<Expression> groupings = stats.groupings();
// first resolve groupings since the aggs might refer to them
// trying to globally resolve unresolved attributes will lead to some being marked as unresolvable
if (Resolvables.resolved(groupings) == false) {
List<Expression> newGroupings = new ArrayList<>(groupings.size());
for (Expression g : groupings) {
Expression resolved = g.transformUp(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
if (resolved != g) {
changed.set(true);
}
newGroupings.add(resolved);
}
groupings = newGroupings;
if (changed.get()) {
stats = new InlineStats(stats.source(), stats.child(), newGroupings, stats.aggregates());
changed.set(false);
}
}

if (stats.expressionsResolved() == false) {
AttributeMap<Expression> resolved = new AttributeMap<>();
for (Expression e : groupings) {
Attribute attr = Expressions.attribute(e);
if (attr != null && attr.resolved()) {
resolved.put(attr, attr);
}
}
List<Attribute> resolvedList = NamedExpressions.mergeOutputAttributes(new ArrayList<>(resolved.keySet()), childrenOutput);
List<NamedExpression> newAggregates = new ArrayList<>();

for (NamedExpression aggregate : stats.aggregates()) {
var agg = (NamedExpression) aggregate.transformUp(UnresolvedAttribute.class, ua -> {
Expression ne = ua;
Attribute maybeResolved = maybeResolveAttribute(ua, resolvedList);
if (maybeResolved != null) {
changed.set(true);
ne = maybeResolved;
}
return ne;
});
newAggregates.add(agg);
}

stats = changed.get() ? new InlineStats(stats.source(), stats.child(), groupings, newAggregates) : stats;
}

return stats;
}

private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput) {
if (p.target() instanceof UnresolvedAttribute ua) {
Attribute resolved = maybeResolveAttribute(ua, childrenOutput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.Mapper;
import org.elasticsearch.xpack.esql.plugin.ComputeService;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.session.EsqlIndexResolver;
import org.elasticsearch.xpack.esql.session.EsqlSession;
import org.elasticsearch.xpack.esql.stats.Metrics;
import org.elasticsearch.xpack.esql.stats.QueryMetric;

import java.util.function.BiConsumer;

import static org.elasticsearch.action.ActionListener.wrap;

public class PlanExecutor {
Expand Down Expand Up @@ -52,7 +55,8 @@ public void esql(
String sessionId,
EsqlConfiguration cfg,
EnrichPolicyResolver enrichPolicyResolver,
ActionListener<PhysicalPlan> listener
BiConsumer<PhysicalPlan, ActionListener<EsqlSession.Result>> runPhase,
ActionListener<EsqlSession.Result> listener
) {
final var session = new EsqlSession(
sessionId,
Expand All @@ -68,7 +72,7 @@ public void esql(
);
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);
session.execute(request, wrap(listener::onResponse, ex -> {
session.execute(request, runPhase, wrap(listener::onResponse, ex -> {
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request
metrics.failed(clientId);
listener.onFailure(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,40 @@

package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.core.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.type.EsField;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.IntStream;

public class InlineStats extends UnaryPlan {
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;

public class InlineStats extends UnaryPlan implements Phased {

private final List<Expression> groupings;
private final List<? extends NamedExpression> aggregates;
private List<Attribute> output;
Comment thread
alex-spies marked this conversation as resolved.
Outdated

public InlineStats(Source source, LogicalPlan child, List<Expression> groupings, List<? extends NamedExpression> aggregates) {
super(source, child);
Expand Down Expand Up @@ -56,7 +73,34 @@ public boolean expressionsResolved() {

@Override
public List<Attribute> output() {
return Expressions.asAttributes(aggregates);
// NOCOMMIT when is this one called?
if (this.output == null) {
this.output = mergeOutputAttributes(Expressions.asAttributes(aggregates), child().output());

@alex-spies alex-spies Jul 19, 2024

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is subtly incorrect; the aggregates will contain the BY clause, which means that
ROW x = 1, y = 2 | INLINESTATS max(y) by x
produces the following output: y, max(y), x
but it should be: x, y, max(y), as shown e.g. by the csv test inlinestats.Two.

Unfortunately, when the groupings do not occur in child.output() (because they're an expression), it's correct to put them on the right hand side. Like in ROW y = 2 | INLINESTATS max(y) BY x = 1 the output should be y, max(y), x.

This method is currently unused, but will become important if we move the phase subdivision after the logical optimization.

I think the correct code should be

List<NamedExpression> addedFields = new ArrayList<>();
AttributeSet childOutput = child().outputSet();

for (NamedExpression agg: aggregates) {
  if (childOutput.contains(agg) == false) {
    addedFields.add(agg);
  }
}

this.output = mergeOutputExpressions(addedFields, child.output());

We could try and update this right away or at least leave a TODO here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it now. I can't make a test for it that passes, but I've made one that doesn't pass and added an Ignore and stuck it in the follow-up. We need the "expressions in the right hand side" stuff first.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This will matter once we move the phasing past the logical optimization (instead of after analysis); currently, Inlinestats.output() should be dead code.

}
return output;
Comment thread
nik9000 marked this conversation as resolved.
Outdated
}

@Override
public LogicalPlan firstPhase() {
return new Aggregate(source(), child(), groupings, aggregates);
}

@Override
public LogicalPlan nextPhase(List<Attribute> layout, List<Page> firstPhaseResult) {
// NOCOMMIT memory tracking
if (firstPhaseResult.size() > 1) {
throw new UnsupportedOperationException();
}
List<NamedExpression> namedGroupings = groupings.stream().map(Expressions::wrapAsNamed).toList();
Page page = firstPhaseResult.get(0);
Block[] blocks = IntStream.range(0, page.getBlockCount()).mapToObj(b -> {
Block block = page.getBlock(b);
Block.Builder builder = block.elementType().newBlockBuilder(block.getPositionCount(), PlannerUtils.NON_BREAKING_BLOCK_FACTORY);
builder.copyFrom(block, 0, block.getPositionCount());
return builder.build();
}).toArray(Block[]::new);
LocalRelation local = new LocalRelation(source(), layout, LocalSupplier.of(blocks));
return new Lookup(source(), child(), new ReferenceAttribute(source(), "unused", DataType.KEYWORD), namedGroupings, local);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.logical;

import org.elasticsearch.compute.data.Page;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.core.util.Holder;

import java.util.List;

public interface Phased {
LogicalPlan firstPhase();
LogicalPlan nextPhase(List<Attribute> layout, List<Page> firstPhaseResult);

static LogicalPlan extractNextPhase(LogicalPlan plan) {
var firstPhase = new Holder<LogicalPlan>();
plan.forEachUp(t -> {
if (t instanceof Phased phased) {
firstPhase.set(phased.firstPhase());
}
});
Comment thread
alex-spies marked this conversation as resolved.
return firstPhase.get();
}

static LogicalPlan applyResultsFromNextPhase(LogicalPlan plan, List<Attribute> layout, List<Page> result) {
return plan.transformUp(logicalPlan -> {
// NOCOMMIT make sure this stops after the first one.
if (logicalPlan instanceof Phased phased) {
return phased.nextPhase(layout, result);
}
return logicalPlan;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Grok;
import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
Expand All @@ -36,6 +38,7 @@
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Phased;
import org.elasticsearch.xpack.esql.plan.logical.TopN;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
import org.elasticsearch.xpack.esql.session.EsqlSession;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -89,8 +90,6 @@
* Computes the result of a {@link PhysicalPlan}.
*/
public class ComputeService {
public record Result(List<Page> pages, List<DriverProfile> profiles) {}
Comment thread
nik9000 marked this conversation as resolved.
Outdated

private static final Logger LOGGER = LogManager.getLogger(ComputeService.class);
private final SearchService searchService;
private final BigArrays bigArrays;
Expand Down Expand Up @@ -136,7 +135,7 @@ public void execute(
CancellableTask rootTask,
PhysicalPlan physicalPlan,
EsqlConfiguration configuration,
ActionListener<Result> listener
ActionListener<EsqlSession.Result> listener
) {
Tuple<PhysicalPlan, PhysicalPlan> coordinatorAndDataNodePlan = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(
physicalPlan,
Expand Down Expand Up @@ -176,7 +175,7 @@ public void execute(
rootTask,
computeContext,
coordinatorPlan,
listener.map(driverProfiles -> new Result(collectedPages, driverProfiles))
listener.map(driverProfiles -> new EsqlSession.Result(collectedPages, physicalPlan.output(), driverProfiles))
);
return;
} else {
Expand All @@ -201,7 +200,9 @@ public void execute(
);
try (
Releasable ignored = exchangeSource.addEmptySink();
RefCountingListener refs = new RefCountingListener(listener.map(unused -> new Result(collectedPages, collectedProfiles)))
RefCountingListener refs = new RefCountingListener(
listener.map(unused -> new EsqlSession.Result(collectedPages, physicalPlan.output(), collectedProfiles))
)
) {
// run compute on the coordinator
exchangeSource.addCompletionListener(refs.acquire());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,35 +158,21 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
request.tables()
);
String sessionId = sessionID(task);
planExecutor.esql(
request,
sessionId,
configuration,
enrichPolicyResolver,
listener.delegateFailureAndWrap(
(delegate, physicalPlan) -> computeService.execute(
sessionId,
(CancellableTask) task,
physicalPlan,
configuration,
delegate.map(result -> {
List<ColumnInfo> columns = physicalPlan.output()
.stream()
.map(c -> new ColumnInfo(c.qualifiedName(), EsqlDataTypes.outputType(c.dataType())))
.toList();
EsqlQueryResponse.Profile profile = configuration.profile()
? new EsqlQueryResponse.Profile(result.profiles())
: null;
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
String id = asyncTask.getExecutionId().getEncoded();
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
} else {
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
}
})
)
)
);
planExecutor.esql(request, sessionId, configuration, enrichPolicyResolver, (physicalPlan, resultListener) -> {
computeService.execute(sessionId, (CancellableTask) task, physicalPlan, configuration, resultListener);
}, listener.map(result -> {
List<ColumnInfo> columns = result.layout()
.stream()
.map(c -> new ColumnInfo(c.qualifiedName(), EsqlDataTypes.outputType(c.dataType())))
.toList();
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
String id = asyncTask.getExecutionId().getEncoded();
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
} else {
return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
}
}));
}

/**
Expand Down
Loading