Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0307f52
Add BY to grammar
darius-vil Mar 26, 2026
081a557
Ad csv tests
darius-vil Mar 26, 2026
769fc78
Implement CHANGE_POINT BY
darius-vil Mar 26, 2026
7379341
Add column sortable check
darius-vil Mar 26, 2026
16db485
Fix factory describe
darius-vil Mar 26, 2026
b2f0161
Apply spotless
darius-vil Mar 26, 2026
a70e5b9
Introduce nullable groupingChannel
darius-vil Mar 26, 2026
e14cccb
Refactor methods
darius-vil Mar 27, 2026
294dec4
Fix edge case
darius-vil Mar 27, 2026
057d781
Collect changepoints in a queue
darius-vil Mar 27, 2026
254a3f6
Add tests around changepoints split across pages
darius-vil Mar 27, 2026
59a30a3
Fix bug and add more unit tests
darius-vil Mar 30, 2026
bed2254
Merge branch 'main' into changepoint-limit-by
darius-vil Mar 30, 2026
f75395c
Add BY parser tests after merge
darius-vil Mar 30, 2026
8233ba2
Fix empty input and nulls in the middle group
darius-vil Mar 30, 2026
decb5a9
Refactor unit test
darius-vil Mar 30, 2026
6dbd9cd
Remove TODOs
darius-vil Mar 30, 2026
8113031
[CI] Auto commit changes from spotless
Mar 30, 2026
a1d9975
Count value limit per group
darius-vil Mar 31, 2026
d4cdc6b
Update docs
darius-vil Mar 31, 2026
1ac27e9
Minor fixes
darius-vil Mar 31, 2026
d66805c
Fix test checkstyle
darius-vil Mar 31, 2026
862fbae
[CI] Auto commit changes from spotless
Mar 31, 2026
5d2337a
Fix typo in test
darius-vil Mar 31, 2026
7a45883
Update docs/changelog/145210.yaml
darius-vil Mar 31, 2026
583c302
Add versioning to docs
darius-vil Mar 31, 2026
f1a0a57
Merge branch 'main' into changepoint-limit-by
darius-vil Apr 1, 2026
f8f57b6
Add CHANGE_POINT BY generative test
darius-vil Apr 2, 2026
2a15f9a
Merge branch 'main' into changepoint-limit-by
darius-vil Apr 2, 2026
fead1ff
[CI] Auto commit changes from spotless
Apr 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/145210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 145210
summary: Add a BY clause to CHANGE_POINT command
type: feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
% This is generated by ESQL's CommandDocsTests. Do not edit it. See ../README.md for how to regenerate it.

```esql
ROW k=[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25], g=[0,1,2]
| MV_EXPAND k
| MV_EXPAND g
| EVAL value=CASE(k<13, 0, 42)
| CHANGE_POINT value ON k BY g
| WHERE type IS NOT NULL
```

| k:integer | g:integer | value:integer | type:keyword | pvalue:double |
| --- | --- | --- | --- | --- |
| 13 | 0 | 42 | step_change | 0.0 |
| 13 | 1 | 42 | step_change | 0.0 |
| 13 | 2 | 42 | step_change | 0.0 |
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,22 @@ The `CHANGE_POINT` command requires a [platinum license](https://www.elastic.co/

## Syntax

::::{applies-switch}

:::{applies-item} { stack: ga 9.2+, "serverless": "ga"}
```esql
CHANGE_POINT value [ON key] [AS type_name, pvalue_name]
```
:::

:::{applies-item} { "stack": "preview 9.4", "serverless": "preview" }
```esql
CHANGE_POINT value [ON key] [BY group] [AS type_name, pvalue_name]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darius-vil we must keep version changes relevant for all users on 9.x

this change is only relevant to 9.4+ so we need to tag it appropriately with applies_to tags

See this comment for pointers on that : #144300 (comment)

Copy link
Copy Markdown
Member

@leemthompo leemthompo Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could keep the old one in a 9.whatever-9.foo tab and add a new one in a 9.4+ versioned tab (AKA applies switch)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out!

I meant to ask how do these work, but it completely slipped my mind in the end... I'll fix this in a sec

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the time being, I marked all the new switches/sections/inline changes with stack: preview 9.4 and serverless: preview - I'm not sure what the correct values should be.

Since CHANGE_POINT BY hinges on recently introduced LIMIT BY, I guess our release lifecycle should follow theirs?

I'll keep this comment open until I find the answer

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool you'll definitely need stack: ga|preview 9.4 but if it's ga you won't need any serverless tags :)

```
:::

::::


## Parameters

Expand All @@ -23,6 +36,9 @@ CHANGE_POINT value [ON key] [AS type_name, pvalue_name]
`key`
: The column with the key to order the values by. If not specified, `@timestamp` is used.

`group` {applies_to}`stack: preview 9.4` {applies_to}`serverless: preview`
: The column to group values by. When specified, change point detection is performed independently for each group.

`type_name`
: The name of the output column with the change point type. If not specified, `type` is used.

Expand All @@ -44,6 +60,8 @@ The possible change point types are:

::::{note}
There must be at least 22 values for change point detection. Any values beyond the first 1,000 are ignored.

When a `BY` clause is provided, these rules apply per group. {applies_to}`stack: preview 9.4` {applies_to}`serverless: preview`
::::

## Examples
Expand All @@ -52,3 +70,8 @@ The following example detects a step change in a metric:

:::{include} ../examples/change_point.csv-spec/changePointForDocs.md
:::

The following example detects a step change independently for each group: {applies_to}`stack: preview 9.4` {applies_to}`serverless: preview`

:::{include} ../examples/change_point.csv-spec/changePointForDocsByGroup.md
:::
860 changes: 430 additions & 430 deletions muted-tests.yml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.compute.operator;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
Expand All @@ -25,6 +26,7 @@
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;

/**
* Find spikes, dips and change point in a list of values.
Expand All @@ -37,29 +39,35 @@ public class ChangePointOperator extends CompleteInputCollectorOperator {
private static final Logger logger = LogManager.getLogger(ChangePointOperator.class);
public static final int INPUT_VALUE_COUNT_LIMIT = 1000;

public record Factory(int channel, WarningSourceLocation source) implements OperatorFactory {
private record DetectedChangePoint(int index, ChangeType type) {}

public record Factory(int channel, Integer groupingChannel, WarningSourceLocation source) implements OperatorFactory {

@Override
public Operator get(DriverContext driverContext) {
return new ChangePointOperator(driverContext, channel, source);
return new ChangePointOperator(driverContext, channel, groupingChannel, source);
}

@Override
public String describe() {
return "ChangePointOperator[channel=" + channel + "]";
return groupingChannel == null
? Strings.format("ChangePointOperator[channel=%d]", channel)
: Strings.format("ChangePointOperator[channel=%d, groupingChannel=%d]", channel, groupingChannel);
}
}

private final DriverContext driverContext;
private final int channel;
private final Integer groupChannel;
private final WarningSourceLocation source;

private final Deque<Page> outputPages;
private Warnings warnings;

public ChangePointOperator(DriverContext driverContext, int channel, WarningSourceLocation source) {
public ChangePointOperator(DriverContext driverContext, int channel, Integer groupingChannel, WarningSourceLocation source) {
this.driverContext = driverContext;
this.channel = channel;
this.groupChannel = groupingChannel;
this.source = source;

outputPages = new ArrayDeque<>();
Expand All @@ -71,25 +79,67 @@ public boolean canProduceMoreDataWithoutExtraInput() {
return outputPages.isEmpty() == false;
}

private void createOutputPages() {
int valuesCount = 0;
for (Page page : inputPages) {
valuesCount += page.getPositionCount();
}
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
if (tooManyValues) {
valuesCount = INPUT_VALUE_COUNT_LIMIT;
}
private ChangeType detectChangePoint(List<Double> values, List<Integer> bucketIndexes) {
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
null,
values.stream().mapToDouble(Double::doubleValue).toArray(),
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
);
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
return changeType;
}

List<Double> values = new ArrayList<>(valuesCount);
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
private void createOutputPages() {
List<Double> values = new ArrayList<>();
List<Integer> bucketIndexes = new ArrayList<>();
ArrayDeque<DetectedChangePoint> detectedChangePoints = new ArrayDeque<>();
Copy link
Copy Markdown
Contributor

@jan-elastic jan-elastic Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we need this.

The way I imagine this working:

  • process addInput(Page page) calls until you hit a row with group key != previous group key
  • different group key triggers detecting the change point in the group
  • create the output for the group

int valuesIndex = 0;
int currentGroupRowCount = 0;
Object previousGroupKey = groupChannel != null && inputPages.isEmpty() == false
? BlockUtils.toJavaObject(inputPages.peek().getBlock(groupChannel), 0)
: null;

boolean hasNulls = false;
boolean hasMultivalued = false;
boolean hasIndeterminableChangePoint = false;
boolean tooManyValues = false;
boolean lastGroupHasRows = false;
String indeterminableChangePointReason = "";
for (Page inputPage : inputPages) {
Block inputBlock = inputPage.getBlock(channel);
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
Block groupBlock = groupChannel != null ? inputPage.getBlock(groupChannel) : null;
for (int i = 0; i < inputBlock.getPositionCount(); i++) {
if (groupBlock != null) {
Object currentGroupKey = BlockUtils.toJavaObject(groupBlock, i);
if (Objects.equals(currentGroupKey, previousGroupKey) == false) {
if (values.isEmpty() == false || lastGroupHasRows) {
var changeType = detectChangePoint(values, bucketIndexes);
var changePointIndex = changeType.changePoint();
if (changePointIndex >= 0) {
detectedChangePoints.offer(new DetectedChangePoint(changePointIndex, changeType));
}
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
hasIndeterminableChangePoint = true;
indeterminableChangePointReason = indeterminable.getReason();
}
values.clear();
bucketIndexes.clear();
lastGroupHasRows = false;
}
previousGroupKey = currentGroupKey;
currentGroupRowCount = 0;
}
}

if (currentGroupRowCount >= INPUT_VALUE_COUNT_LIMIT) {
tooManyValues = true;
valuesIndex++;
continue;
}

Object value = BlockUtils.toJavaObject(inputBlock, i);
lastGroupHasRows = true;
currentGroupRowCount++;
if (value == null) {
hasNulls = true;
valuesIndex++;
Expand All @@ -103,32 +153,47 @@ private void createOutputPages() {
}
}

MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
null,
values.stream().mapToDouble(Double::doubleValue).toArray(),
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
);
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
int changePointIndex = changeType.changePoint();
// flush last (or only) group; for "non-grouped" or "all-null" input this still
// runs to produce an "indeterminable" warning.
if (values.isEmpty() == false || groupChannel == null || lastGroupHasRows) {
var changeType = detectChangePoint(values, bucketIndexes);
var changePointIndex = changeType.changePoint();
if (changePointIndex >= 0) {
detectedChangePoints.offer(new DetectedChangePoint(changePointIndex, changeType));
}
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
hasIndeterminableChangePoint = true;
indeterminableChangePointReason = indeterminable.getReason();
}
}

buildOutputPages(detectedChangePoints);
emitWarnings(tooManyValues, hasNulls, hasMultivalued, hasIndeterminableChangePoint, indeterminableChangePointReason);
}

private void buildOutputPages(ArrayDeque<DetectedChangePoint> detectedChangePoints) {
BlockFactory blockFactory = driverContext.blockFactory();
int pageStartIndex = 0;
while (inputPages.isEmpty() == false) {
Page inputPage = inputPages.peek();
int pageEndIndex = pageStartIndex + inputPage.getPositionCount();
Page outputPage;
Block changeTypeBlock = null;
Block changePvalueBlock = null;
boolean success = false;
try {
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
DetectedChangePoint head = detectedChangePoints.peek();
if (head != null && head.index() < pageEndIndex) {
try (
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
) {
for (int i = 0; i < inputPage.getPositionCount(); i++) {
if (pageStartIndex + i == changePointIndex) {
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
pvalueBlockBuilder.appendDouble(changeType.pValue());
if (head != null && pageStartIndex + i == head.index()) {
changeTypeBlockBuilder.appendBytesRef(new BytesRef(head.type().getWriteableName()));
pvalueBlockBuilder.appendDouble(head.type().pValue());
detectedChangePoints.poll();
head = detectedChangePoints.peek();
} else {
changeTypeBlockBuilder.appendNull();
pvalueBlockBuilder.appendNull();
Expand All @@ -152,33 +217,33 @@ private void createOutputPages() {

inputPages.removeFirst();
outputPages.add(outputPage);
pageStartIndex += inputPage.getPositionCount();
pageStartIndex = pageEndIndex;
}
}

if (changeType instanceof ChangeType.Indeterminable indeterminable) {
if (logger.isDebugEnabled()) {
logger.debug("Change point indeterminable: {}", indeterminable.getReason());
}
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
}
private void emitWarnings(
boolean tooManyValues,
boolean hasNulls,
boolean hasMultivalued,
boolean hasIndeterminableChangePoint,
String indeterminableReason
) {
if (tooManyValues) {
if (logger.isDebugEnabled()) {
logger.debug("Too many values: limit is {}, some values were ignored", INPUT_VALUE_COUNT_LIMIT);
}
logger.debug(() -> Strings.format("Too many values: limit is %d, some values were ignored", INPUT_VALUE_COUNT_LIMIT));
warnings(true).registerException(
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
);
}
if (hasIndeterminableChangePoint) {
logger.debug(() -> Strings.format("Change point indeterminable: %s", indeterminableReason));
warnings(false).registerException(new IllegalArgumentException(indeterminableReason));
}
if (hasNulls) {
if (logger.isDebugEnabled()) {
logger.debug("Values contain nulls; skipping them");
}
logger.debug(() -> "Values contain nulls; skipping them");
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
}
if (hasMultivalued) {
if (logger.isDebugEnabled()) {
logger.debug("Values contain multivalued entries; skipping them");
}
logger.debug(() -> "Values contain multivalued entries; skipping them");
warnings(true).registerException(
new IllegalArgumentException(
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
Expand Down Expand Up @@ -209,7 +274,11 @@ protected void onClose() {

@Override
public String toString() {
return "ChangePointOperator[channel=" + channel + "]";
if (groupChannel == null) {
return "ChangePointOperator[channel=" + channel + "]";
} else {
return "ChangePointOperator[channel=" + channel + ", groupChannel=" + groupChannel + "]";
}
}

private Warnings warnings(boolean onlyWarnings) {
Expand Down
Loading