[ESQL] Add a BY clause to CHANGE_POINT command#145210
[ESQL] Add a BY clause to CHANGE_POINT command#145210darius-vil wants to merge 30 commits intoelastic:mainfrom
Conversation
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ChangePoint.java
Show resolved
Hide resolved
| // Group A: [0×15, 1×15] -> step at row 15 of page0 | ||
| // Group B: [1×15, 0×15] -> step at row 15 of page1 | ||
| // Group C: [0×15, 1×15] -> step at row 15 of page2 | ||
| List<Long> valuesColumn = Stream.of( |
There was a problem hiding this comment.
These are somewhat hard to read, but I really wanted some tests around groups spanning across pages...
| changePointConfiguration | ||
| : ON key=qualifiedName | ||
| | AS targetType=qualifiedName COMMA targetPvalue=qualifiedName | ||
| | {this.isDevVersion()}? BY grouping=qualifiedName |
There was a problem hiding this comment.
I assume {this.isDevVersion()}? is needed since LimitBy has it and we depend on it.
There was a problem hiding this comment.
Update: looks like LimitBy is about to remove it: #145225
Should we follow suit?
|
|
||
| /** | ||
| * Enables the feature LIMIT n BY expr1, expr2 for retaining at most n docs per group. | ||
| * The feature will not work if we had SORT | LIMIT n BY |
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Hi @darius-vil, I've created a changelog YAML for you. |
|
|
||
| ```esql | ||
| CHANGE_POINT value [ON key] [AS type_name, pvalue_name] | ||
| CHANGE_POINT value [ON key] [BY group] [AS type_name, pvalue_name] |
There was a problem hiding this comment.
@darius-vil we must keep version changes relevant for all users on 9.x
this change is only relevant to 9.4+ so we need to tag it appropriately with applies_to tags
See this comment for pointers on that : #144300 (comment)
There was a problem hiding this comment.
you could keep the old one in a 9.whatever-9.foo tab and add a new one in a 9.4+ versioned tab (AKA applies switch)
There was a problem hiding this comment.
Thanks for pointing this out!
I meant to ask how do these work, but it completely slipped my mind in the end... I'll fix this in a sec
There was a problem hiding this comment.
For the time being, I marked all the new switches/sections/inline changes with stack: preview 9.4 and serverless: preview - I'm not sure what the correct values should be.
Since CHANGE_POINT BY hinges on recently introduced LIMIT BY, I guess our release lifecycle should follow theirs?
I'll keep this comment open until I find the answer
There was a problem hiding this comment.
cool you'll definitely need stack: ga|preview 9.4 but if it's ga you won't need any serverless tags :)
docs/reference/query-languages/esql/_snippets/commands/layout/change_point.md
Outdated
Show resolved
Hide resolved
docs/reference/query-languages/esql/_snippets/commands/layout/change_point.md
Outdated
Show resolved
Hide resolved
docs/reference/query-languages/esql/_snippets/commands/layout/change_point.md
Outdated
Show resolved
Hide resolved
| Attribute key, | ||
| Attribute targetType, | ||
| Attribute targetPvalue, | ||
| Attribute grouping |
There was a problem hiding this comment.
What about multiple groupings (CHANGE_POINT ... BY host, day, etc)) or general expressions (BY n+1, LENGTH(host))
I think this should be a List<Expression> comparable to LimitBy.
There was a problem hiding this comment.
Never considered multiple groups, my original proposal has always been CHANGE_POINT v [ON t] [AS k, l] [BY g] instead of CHANGE_POINT v [ON t] [AS k, l] [BY g1 [, g2 [, ... [, gN]]]].
I should have probably guessed though 😄 Thanks for pointing this out!
Luckily, this doesn't complicate things that much. This includes two major changes:
- change in grammar and the boilerplate around it
- tracking when changepoint needs to be invoked is the complicated part, but thanks to reusing this becomes trivial
Regarding supporting Expressions, I got them working by following the same path as LimitBy - they have a custom optimizer that replaces expressions with attributes:
I copied this approach, it works, but I am unsure whether it's the only and correct way of achieving this yet.| changePointConfiguration | ||
| : ON key=qualifiedName | ||
| | AS targetType=qualifiedName COMMA targetPvalue=qualifiedName | ||
| | {this.isDevVersion()}? BY grouping=qualifiedName |
There was a problem hiding this comment.
I think this should be (BY grouping=fields)? (see also StatsCommand)
There was a problem hiding this comment.
Hmm, not sure. Why do you think ()? is necessary here?
Notice that the actual changePointCommand is a few lines above
changePointCommand
: CHANGE_POINT value=qualifiedName (changePointConfiguration)*
;
that's where changePointConfiguration is used and it's wrapped in ()*, meaning any rules in changePointConfiguration can appear zero or more times. It's later verified in LogicalPlanBuilder that any changePointConfiguration rule appears at most 1 time.
May be helpful here: https://github.com/antlr/antlr4/blob/dev/doc/parser-rules.md#subrules
| ; | ||
|
|
||
|
|
||
| detect step change by group not nulled |
There was a problem hiding this comment.
Please add test for:
- multiple grouping (
by a,b,c) - expression grouping (
by a+1, "hi")
| // Grouping must be sortable | ||
| if (grouping != null) { | ||
| type = grouping.dataType(); | ||
| if (DataType.isSortable(type) == false) { |
There was a problem hiding this comment.
I don't see why this must be sortable.
Is that the case with LimitBy as well?
There was a problem hiding this comment.
My thinking at the time of writing was:
ChangePoint invokes LimitBy with the expectation that it sorts our input by grouping(s), which means grouping(s) has to be sortable and this is the place to verify whether grouping(s) is sortable?
I think if we don't do this here, it would simply fail in OrderBy with a similar cause:
CHANGE_POINT grouping only supports sortable values, found expression
There was a problem hiding this comment.
LimitBy doesn't necessarily sorts the input by grouping(s) (as far as I know). It just groups by it. You can do that without sorting, e.g. by using a hash table.
| * data that is passed to it, runs the change point detector on the data (which | ||
| * is a compute-heavy process), and then outputs all data with the change points. | ||
| */ | ||
| public class ChangePointOperator extends CompleteInputCollectorOperator { |
There was a problem hiding this comment.
This is not correct anymore:
ChangePointOperator used to be a CompleteInputCollectorOperator, because it would collect 1001 rows at max.
The new changepoint by, on the other hand, can get unlimited amounts of data, and needs to be streaming.
| private void createOutputPages() { | ||
| List<Double> values = new ArrayList<>(); | ||
| List<Integer> bucketIndexes = new ArrayList<>(); | ||
| ArrayDeque<DetectedChangePoint> detectedChangePoints = new ArrayDeque<>(); |
There was a problem hiding this comment.
I don't see why we need this.
The way I imagine this working:
- process
addInput(Page page)calls until you hit a row with group key != previous group key - different group key triggers detecting the change point in the group
- create the output for the group
jan-elastic
left a comment
There was a problem hiding this comment.
I gave this a first pass, see comments. I think there's some functionality missing.
This introduces a
BYclause to CHANGE_POINT: https://www.elastic.co/docs/reference/query-languages/esql/commands/change-pointThe most complicated change is in ChangePointOperator, which now has the added complexity of keeping track of group changes in the sorted input.