diff --git a/docs/reference/query-languages/esql/_snippets/functions/types/first_over_time.md b/docs/reference/query-languages/esql/_snippets/functions/types/first_over_time.md index 1492da5c95e70..9521ef1a95104 100644 --- a/docs/reference/query-languages/esql/_snippets/functions/types/first_over_time.md +++ b/docs/reference/query-languages/esql/_snippets/functions/types/first_over_time.md @@ -11,4 +11,5 @@ | exponential_histogram {applies_to}`stack: preview 9.3.0` | time_duration {applies_to}`stack: preview 9.3.0` | exponential_histogram | | integer | time_duration {applies_to}`stack: preview 9.3.0` | integer | | long | time_duration {applies_to}`stack: preview 9.3.0` | long | +| tdigest {applies_to}`stack: preview 9.4.0` | time_duration {applies_to}`stack: preview 9.3.0` | tdigest | diff --git a/docs/reference/query-languages/esql/_snippets/functions/types/last_over_time.md b/docs/reference/query-languages/esql/_snippets/functions/types/last_over_time.md index 1492da5c95e70..9521ef1a95104 100644 --- a/docs/reference/query-languages/esql/_snippets/functions/types/last_over_time.md +++ b/docs/reference/query-languages/esql/_snippets/functions/types/last_over_time.md @@ -11,4 +11,5 @@ | exponential_histogram {applies_to}`stack: preview 9.3.0` | time_duration {applies_to}`stack: preview 9.3.0` | exponential_histogram | | integer | time_duration {applies_to}`stack: preview 9.3.0` | integer | | long | time_duration {applies_to}`stack: preview 9.3.0` | long | +| tdigest {applies_to}`stack: preview 9.4.0` | time_duration {applies_to}`stack: preview 9.3.0` | tdigest | diff --git a/docs/reference/query-languages/esql/kibana/definition/functions/first_over_time.json b/docs/reference/query-languages/esql/kibana/definition/functions/first_over_time.json index a8bd1f38502b9..d055294e8e8e7 100644 --- a/docs/reference/query-languages/esql/kibana/definition/functions/first_over_time.json +++ b/docs/reference/query-languages/esql/kibana/definition/functions/first_over_time.json @@ -129,6 +129,24 @@ ], "variadic" : false, "returnType" : "long" + }, + { + "params" : [ + { + "name" : "field", + "type" : "tdigest", + "optional" : false, + "description" : "the metric field to calculate the value for" + }, + { + "name" : "window", + "type" : "time_duration", + "optional" : true, + "description" : "the time window over which to compute the first over time value" + } + ], + "variadic" : false, + "returnType" : "tdigest" } ], "examples" : [ diff --git a/docs/reference/query-languages/esql/kibana/definition/functions/last_over_time.json b/docs/reference/query-languages/esql/kibana/definition/functions/last_over_time.json index 311721f505a0f..ddaec4d386fb5 100644 --- a/docs/reference/query-languages/esql/kibana/definition/functions/last_over_time.json +++ b/docs/reference/query-languages/esql/kibana/definition/functions/last_over_time.json @@ -129,6 +129,24 @@ ], "variadic" : false, "returnType" : "long" + }, + { + "params" : [ + { + "name" : "field", + "type" : "tdigest", + "optional" : false, + "description" : "the metric field to calculate the latest value for" + }, + { + "name" : "window", + "type" : "time_duration", + "optional" : true, + "description" : "the time window over which to find the latest value" + } + ], + "variadic" : false, + "returnType" : "tdigest" } ], "examples" : [ diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampAggregatorFunction.java new file mode 100644 index 0000000000000..e63ba9289dd32 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampAggregatorFunction.java @@ -0,0 +1,186 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.TDigestBlock; +import org.elasticsearch.compute.data.TDigestHolder; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunction} implementation for {@link FirstTDigestByTimestampAggregator}. + * This class is generated. Edit {@code AggregatorImplementer} instead. + */ +public final class FirstTDigestByTimestampAggregatorFunction implements AggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.TDIGEST), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final DriverContext driverContext; + + private final TDigestStates.WithLongSingleState state; + + private final List channels; + + public FirstTDigestByTimestampAggregatorFunction(DriverContext driverContext, + List channels, TDigestStates.WithLongSingleState state) { + this.driverContext = driverContext; + this.channels = channels; + this.state = state; + } + + public static FirstTDigestByTimestampAggregatorFunction create(DriverContext driverContext, + List channels) { + return new FirstTDigestByTimestampAggregatorFunction(driverContext, channels, FirstTDigestByTimestampAggregator.initSingle(driverContext)); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public void addRawInput(Page page, BooleanVector mask) { + if (mask.allFalse()) { + // Entire page masked away + } else if (mask.allTrue()) { + addRawInputNotMasked(page); + } else { + addRawInputMasked(page, mask); + } + } + + private void addRawInputMasked(Page page, BooleanVector mask) { + TDigestBlock tdigestBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + addRawBlock(tdigestBlock, timestampBlock, mask); + } + + private void addRawInputNotMasked(Page page) { + TDigestBlock tdigestBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + addRawBlock(tdigestBlock, timestampBlock); + } + + private void addRawBlock(TDigestBlock tdigestBlock, LongBlock timestampBlock) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int p = 0; p < tdigestBlock.getPositionCount(); p++) { + int tdigestValueCount = tdigestBlock.getValueCount(p); + if (tdigestValueCount == 0) { + continue; + } + int timestampValueCount = timestampBlock.getValueCount(p); + if (timestampValueCount == 0) { + continue; + } + int tdigestStart = tdigestBlock.getFirstValueIndex(p); + int tdigestEnd = tdigestStart + tdigestValueCount; + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(p); + int timestampEnd = timestampStart + timestampValueCount; + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + FirstTDigestByTimestampAggregator.combine(state, tdigestValue, timestampValue); + } + } + } + } + + private void addRawBlock(TDigestBlock tdigestBlock, LongBlock timestampBlock, + BooleanVector mask) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int p = 0; p < tdigestBlock.getPositionCount(); p++) { + if (mask.getBoolean(p) == false) { + continue; + } + int tdigestValueCount = tdigestBlock.getValueCount(p); + if (tdigestValueCount == 0) { + continue; + } + int timestampValueCount = timestampBlock.getValueCount(p); + if (timestampValueCount == 0) { + continue; + } + int tdigestStart = tdigestBlock.getFirstValueIndex(p); + int tdigestEnd = tdigestStart + tdigestValueCount; + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(p); + int timestampEnd = timestampStart + timestampValueCount; + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + FirstTDigestByTimestampAggregator.combine(state, tdigestValue, timestampValue); + } + } + } + } + + @Override + public void addIntermediateInput(Page page) { + assert channels.size() == intermediateBlockCount(); + assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + assert timestamps.getPositionCount() == 1; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + TDigestBlock values = (TDigestBlock) valuesUncast; + assert values.getPositionCount() == 1; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert seen.getPositionCount() == 1; + TDigestHolder valuesScratch = new TDigestHolder(); + FirstTDigestByTimestampAggregator.combineIntermediate(state, timestamps.getLong(0), values.getTDigestHolder(values.getFirstValueIndex(0), valuesScratch), seen.getBoolean(0)); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + state.toIntermediate(blocks, offset, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { + blocks[offset] = FirstTDigestByTimestampAggregator.evaluateFinal(state, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..736cbec7856fe --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampAggregatorFunctionSupplier.java @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link FirstTDigestByTimestampAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class FirstTDigestByTimestampAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public FirstTDigestByTimestampAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + return FirstTDigestByTimestampAggregatorFunction.intermediateStateDesc(); + } + + @Override + public List groupingIntermediateStateDesc() { + return FirstTDigestByTimestampGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public FirstTDigestByTimestampAggregatorFunction aggregator(DriverContext driverContext, + List channels) { + return FirstTDigestByTimestampAggregatorFunction.create(driverContext, channels); + } + + @Override + public FirstTDigestByTimestampGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return FirstTDigestByTimestampGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return FirstTDigestByTimestampAggregator.describe(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..32e5a8052bd90 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampGroupingAggregatorFunction.java @@ -0,0 +1,320 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.TDigestBlock; +import org.elasticsearch.compute.data.TDigestHolder; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link FirstTDigestByTimestampAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class FirstTDigestByTimestampGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.TDIGEST), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final TDigestStates.WithLongGroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public FirstTDigestByTimestampGroupingAggregatorFunction(List channels, + TDigestStates.WithLongGroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static FirstTDigestByTimestampGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new FirstTDigestByTimestampGroupingAggregatorFunction(channels, FirstTDigestByTimestampAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + TDigestBlock tdigestBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + maybeEnableGroupIdTracking(seenGroupIds, tdigestBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, tdigestBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, tdigestBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, tdigestBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, TDigestBlock tdigestBlock, + LongBlock timestampBlock) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (tdigestBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int tdigestStart = tdigestBlock.getFirstValueIndex(valuesPosition); + int tdigestEnd = tdigestStart + tdigestBlock.getValueCount(valuesPosition); + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + FirstTDigestByTimestampAggregator.combine(state, groupId, tdigestValue, timestampValue); + } + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + TDigestBlock values = (TDigestBlock) valuesUncast; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == seen.getPositionCount(); + TDigestHolder valuesScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + FirstTDigestByTimestampAggregator.combineIntermediate(state, groupId, timestamps.getLong(valuesPosition), values.getTDigestHolder(values.getFirstValueIndex(valuesPosition), valuesScratch), seen.getBoolean(valuesPosition)); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, TDigestBlock tdigestBlock, + LongBlock timestampBlock) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (tdigestBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int tdigestStart = tdigestBlock.getFirstValueIndex(valuesPosition); + int tdigestEnd = tdigestStart + tdigestBlock.getValueCount(valuesPosition); + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + FirstTDigestByTimestampAggregator.combine(state, groupId, tdigestValue, timestampValue); + } + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + TDigestBlock values = (TDigestBlock) valuesUncast; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == seen.getPositionCount(); + TDigestHolder valuesScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + FirstTDigestByTimestampAggregator.combineIntermediate(state, groupId, timestamps.getLong(valuesPosition), values.getTDigestHolder(values.getFirstValueIndex(valuesPosition), valuesScratch), seen.getBoolean(valuesPosition)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, TDigestBlock tdigestBlock, + LongBlock timestampBlock) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (tdigestBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int tdigestStart = tdigestBlock.getFirstValueIndex(valuesPosition); + int tdigestEnd = tdigestStart + tdigestBlock.getValueCount(valuesPosition); + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + FirstTDigestByTimestampAggregator.combine(state, groupId, tdigestValue, timestampValue); + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + TDigestBlock values = (TDigestBlock) valuesUncast; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == seen.getPositionCount(); + TDigestHolder valuesScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + FirstTDigestByTimestampAggregator.combineIntermediate(state, groupId, timestamps.getLong(valuesPosition), values.getTDigestHolder(values.getFirstValueIndex(valuesPosition), valuesScratch), seen.getBoolean(valuesPosition)); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, TDigestBlock tdigestBlock, + LongBlock timestampBlock) { + if (tdigestBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + if (timestampBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = FirstTDigestByTimestampAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastTDigestByTimestampAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastTDigestByTimestampAggregatorFunction.java new file mode 100644 index 0000000000000..7668824d33510 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastTDigestByTimestampAggregatorFunction.java @@ -0,0 +1,186 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.TDigestBlock; +import org.elasticsearch.compute.data.TDigestHolder; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunction} implementation for {@link LastTDigestByTimestampAggregator}. + * This class is generated. Edit {@code AggregatorImplementer} instead. + */ +public final class LastTDigestByTimestampAggregatorFunction implements AggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.TDIGEST), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final DriverContext driverContext; + + private final TDigestStates.WithLongSingleState state; + + private final List channels; + + public LastTDigestByTimestampAggregatorFunction(DriverContext driverContext, + List channels, TDigestStates.WithLongSingleState state) { + this.driverContext = driverContext; + this.channels = channels; + this.state = state; + } + + public static LastTDigestByTimestampAggregatorFunction create(DriverContext driverContext, + List channels) { + return new LastTDigestByTimestampAggregatorFunction(driverContext, channels, LastTDigestByTimestampAggregator.initSingle(driverContext)); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public void addRawInput(Page page, BooleanVector mask) { + if (mask.allFalse()) { + // Entire page masked away + } else if (mask.allTrue()) { + addRawInputNotMasked(page); + } else { + addRawInputMasked(page, mask); + } + } + + private void addRawInputMasked(Page page, BooleanVector mask) { + TDigestBlock tdigestBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + addRawBlock(tdigestBlock, timestampBlock, mask); + } + + private void addRawInputNotMasked(Page page) { + TDigestBlock tdigestBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + addRawBlock(tdigestBlock, timestampBlock); + } + + private void addRawBlock(TDigestBlock tdigestBlock, LongBlock timestampBlock) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int p = 0; p < tdigestBlock.getPositionCount(); p++) { + int tdigestValueCount = tdigestBlock.getValueCount(p); + if (tdigestValueCount == 0) { + continue; + } + int timestampValueCount = timestampBlock.getValueCount(p); + if (timestampValueCount == 0) { + continue; + } + int tdigestStart = tdigestBlock.getFirstValueIndex(p); + int tdigestEnd = tdigestStart + tdigestValueCount; + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(p); + int timestampEnd = timestampStart + timestampValueCount; + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + LastTDigestByTimestampAggregator.combine(state, tdigestValue, timestampValue); + } + } + } + } + + private void addRawBlock(TDigestBlock tdigestBlock, LongBlock timestampBlock, + BooleanVector mask) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int p = 0; p < tdigestBlock.getPositionCount(); p++) { + if (mask.getBoolean(p) == false) { + continue; + } + int tdigestValueCount = tdigestBlock.getValueCount(p); + if (tdigestValueCount == 0) { + continue; + } + int timestampValueCount = timestampBlock.getValueCount(p); + if (timestampValueCount == 0) { + continue; + } + int tdigestStart = tdigestBlock.getFirstValueIndex(p); + int tdigestEnd = tdigestStart + tdigestValueCount; + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(p); + int timestampEnd = timestampStart + timestampValueCount; + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + LastTDigestByTimestampAggregator.combine(state, tdigestValue, timestampValue); + } + } + } + } + + @Override + public void addIntermediateInput(Page page) { + assert channels.size() == intermediateBlockCount(); + assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + assert timestamps.getPositionCount() == 1; + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + TDigestBlock values = (TDigestBlock) valuesUncast; + assert values.getPositionCount() == 1; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert seen.getPositionCount() == 1; + TDigestHolder valuesScratch = new TDigestHolder(); + LastTDigestByTimestampAggregator.combineIntermediate(state, timestamps.getLong(0), values.getTDigestHolder(values.getFirstValueIndex(0), valuesScratch), seen.getBoolean(0)); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + state.toIntermediate(blocks, offset, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, DriverContext driverContext) { + blocks[offset] = LastTDigestByTimestampAggregator.evaluateFinal(state, driverContext); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastTDigestByTimestampAggregatorFunctionSupplier.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastTDigestByTimestampAggregatorFunctionSupplier.java new file mode 100644 index 0000000000000..f0895e8490edc --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastTDigestByTimestampAggregatorFunctionSupplier.java @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.util.List; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link AggregatorFunctionSupplier} implementation for {@link LastTDigestByTimestampAggregator}. + * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead. + */ +public final class LastTDigestByTimestampAggregatorFunctionSupplier implements AggregatorFunctionSupplier { + public LastTDigestByTimestampAggregatorFunctionSupplier() { + } + + @Override + public List nonGroupingIntermediateStateDesc() { + return LastTDigestByTimestampAggregatorFunction.intermediateStateDesc(); + } + + @Override + public List groupingIntermediateStateDesc() { + return LastTDigestByTimestampGroupingAggregatorFunction.intermediateStateDesc(); + } + + @Override + public LastTDigestByTimestampAggregatorFunction aggregator(DriverContext driverContext, + List channels) { + return LastTDigestByTimestampAggregatorFunction.create(driverContext, channels); + } + + @Override + public LastTDigestByTimestampGroupingAggregatorFunction groupingAggregator( + DriverContext driverContext, List channels) { + return LastTDigestByTimestampGroupingAggregatorFunction.create(channels, driverContext); + } + + @Override + public String describe() { + return LastTDigestByTimestampAggregator.describe(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastTDigestByTimestampGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastTDigestByTimestampGroupingAggregatorFunction.java new file mode 100644 index 0000000000000..2c78386fd1e16 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LastTDigestByTimestampGroupingAggregatorFunction.java @@ -0,0 +1,320 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License +// 2.0; you may not use this file except in compliance with the Elastic License +// 2.0. +package org.elasticsearch.compute.aggregation; + +import java.lang.Integer; +import java.lang.Override; +import java.lang.String; +import java.lang.StringBuilder; +import java.util.List; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BooleanBlock; +import org.elasticsearch.compute.data.BooleanVector; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.TDigestBlock; +import org.elasticsearch.compute.data.TDigestHolder; +import org.elasticsearch.compute.operator.DriverContext; + +/** + * {@link GroupingAggregatorFunction} implementation for {@link LastTDigestByTimestampAggregator}. + * This class is generated. Edit {@code GroupingAggregatorImplementer} instead. + */ +public final class LastTDigestByTimestampGroupingAggregatorFunction implements GroupingAggregatorFunction { + private static final List INTERMEDIATE_STATE_DESC = List.of( + new IntermediateStateDesc("timestamps", ElementType.LONG), + new IntermediateStateDesc("values", ElementType.TDIGEST), + new IntermediateStateDesc("seen", ElementType.BOOLEAN) ); + + private final TDigestStates.WithLongGroupingState state; + + private final List channels; + + private final DriverContext driverContext; + + public LastTDigestByTimestampGroupingAggregatorFunction(List channels, + TDigestStates.WithLongGroupingState state, DriverContext driverContext) { + this.channels = channels; + this.state = state; + this.driverContext = driverContext; + } + + public static LastTDigestByTimestampGroupingAggregatorFunction create(List channels, + DriverContext driverContext) { + return new LastTDigestByTimestampGroupingAggregatorFunction(channels, LastTDigestByTimestampAggregator.initGrouping(driverContext), driverContext); + } + + public static List intermediateStateDesc() { + return INTERMEDIATE_STATE_DESC; + } + + @Override + public int intermediateBlockCount() { + return INTERMEDIATE_STATE_DESC.size(); + } + + @Override + public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds, + Page page) { + TDigestBlock tdigestBlock = page.getBlock(channels.get(0)); + LongBlock timestampBlock = page.getBlock(channels.get(1)); + maybeEnableGroupIdTracking(seenGroupIds, tdigestBlock, timestampBlock); + return new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, tdigestBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) { + addRawInput(positionOffset, groupIds, tdigestBlock, timestampBlock); + } + + @Override + public void add(int positionOffset, IntVector groupIds) { + addRawInput(positionOffset, groupIds, tdigestBlock, timestampBlock); + } + + @Override + public void close() { + } + }; + } + + private void addRawInput(int positionOffset, IntArrayBlock groups, TDigestBlock tdigestBlock, + LongBlock timestampBlock) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (tdigestBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int tdigestStart = tdigestBlock.getFirstValueIndex(valuesPosition); + int tdigestEnd = tdigestStart + tdigestBlock.getValueCount(valuesPosition); + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + LastTDigestByTimestampAggregator.combine(state, groupId, tdigestValue, timestampValue); + } + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + TDigestBlock values = (TDigestBlock) valuesUncast; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == seen.getPositionCount(); + TDigestHolder valuesScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + LastTDigestByTimestampAggregator.combineIntermediate(state, groupId, timestamps.getLong(valuesPosition), values.getTDigestHolder(values.getFirstValueIndex(valuesPosition), valuesScratch), seen.getBoolean(valuesPosition)); + } + } + } + + private void addRawInput(int positionOffset, IntBigArrayBlock groups, TDigestBlock tdigestBlock, + LongBlock timestampBlock) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int valuesPosition = groupPosition + positionOffset; + if (tdigestBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int tdigestStart = tdigestBlock.getFirstValueIndex(valuesPosition); + int tdigestEnd = tdigestStart + tdigestBlock.getValueCount(valuesPosition); + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + LastTDigestByTimestampAggregator.combine(state, groupId, tdigestValue, timestampValue); + } + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + TDigestBlock values = (TDigestBlock) valuesUncast; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == seen.getPositionCount(); + TDigestHolder valuesScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + if (groups.isNull(groupPosition)) { + continue; + } + int groupStart = groups.getFirstValueIndex(groupPosition); + int groupEnd = groupStart + groups.getValueCount(groupPosition); + for (int g = groupStart; g < groupEnd; g++) { + int groupId = groups.getInt(g); + int valuesPosition = groupPosition + positionOffset; + LastTDigestByTimestampAggregator.combineIntermediate(state, groupId, timestamps.getLong(valuesPosition), values.getTDigestHolder(values.getFirstValueIndex(valuesPosition), valuesScratch), seen.getBoolean(valuesPosition)); + } + } + } + + private void addRawInput(int positionOffset, IntVector groups, TDigestBlock tdigestBlock, + LongBlock timestampBlock) { + TDigestHolder tdigestScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int valuesPosition = groupPosition + positionOffset; + if (tdigestBlock.isNull(valuesPosition)) { + continue; + } + if (timestampBlock.isNull(valuesPosition)) { + continue; + } + int groupId = groups.getInt(groupPosition); + int tdigestStart = tdigestBlock.getFirstValueIndex(valuesPosition); + int tdigestEnd = tdigestStart + tdigestBlock.getValueCount(valuesPosition); + for (int tdigestOffset = tdigestStart; tdigestOffset < tdigestEnd; tdigestOffset++) { + TDigestHolder tdigestValue = tdigestBlock.getTDigestHolder(tdigestOffset, tdigestScratch); + int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition); + int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition); + for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) { + long timestampValue = timestampBlock.getLong(timestampOffset); + LastTDigestByTimestampAggregator.combine(state, groupId, tdigestValue, timestampValue); + } + } + } + } + + @Override + public void addIntermediateInput(int positionOffset, IntVector groups, Page page) { + state.enableGroupIdTracking(new SeenGroupIds.Empty()); + assert channels.size() == intermediateBlockCount(); + Block timestampsUncast = page.getBlock(channels.get(0)); + if (timestampsUncast.areAllValuesNull()) { + return; + } + LongVector timestamps = ((LongBlock) timestampsUncast).asVector(); + Block valuesUncast = page.getBlock(channels.get(1)); + if (valuesUncast.areAllValuesNull()) { + return; + } + TDigestBlock values = (TDigestBlock) valuesUncast; + Block seenUncast = page.getBlock(channels.get(2)); + if (seenUncast.areAllValuesNull()) { + return; + } + BooleanVector seen = ((BooleanBlock) seenUncast).asVector(); + assert timestamps.getPositionCount() == values.getPositionCount() && timestamps.getPositionCount() == seen.getPositionCount(); + TDigestHolder valuesScratch = new TDigestHolder(); + for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { + int groupId = groups.getInt(groupPosition); + int valuesPosition = groupPosition + positionOffset; + LastTDigestByTimestampAggregator.combineIntermediate(state, groupId, timestamps.getLong(valuesPosition), values.getTDigestHolder(values.getFirstValueIndex(valuesPosition), valuesScratch), seen.getBoolean(valuesPosition)); + } + } + + private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, TDigestBlock tdigestBlock, + LongBlock timestampBlock) { + if (tdigestBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + if (timestampBlock.mayHaveNulls()) { + state.enableGroupIdTracking(seenGroupIds); + } + } + + @Override + public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) { + state.enableGroupIdTracking(seenGroupIds); + } + + @Override + public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) { + state.toIntermediate(blocks, offset, selected, driverContext); + } + + @Override + public void evaluateFinal(Block[] blocks, int offset, IntVector selected, + GroupingAggregatorEvaluationContext ctx) { + blocks[offset] = LastTDigestByTimestampAggregator.evaluateFinal(state, selected, ctx); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getSimpleName()).append("["); + sb.append("channels=").append(channels); + sb.append("]"); + return sb.toString(); + } + + @Override + public void close() { + state.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstExponentialHistogramByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstExponentialHistogramByTimestampAggregator.java index 95f1feba45046..118b58eb6e487 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstExponentialHistogramByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstExponentialHistogramByTimestampAggregator.java @@ -35,7 +35,7 @@ public static ExponentialHistogramStates.WithLongSingleState initSingle(DriverCo } public static void combine(ExponentialHistogramStates.WithLongSingleState current, ExponentialHistogram value, long timestamp) { - if (timestamp < current.longValue()) { + if (current.isSeen() == false || timestamp < current.longValue()) { current.set(timestamp, value); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampAggregator.java new file mode 100644 index 0000000000000..62f5c09c6d337 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/FirstTDigestByTimestampAggregator.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.compute.ann.Aggregator; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.TDigestHolder; +import org.elasticsearch.compute.operator.DriverContext; + +@Aggregator( + { + @IntermediateState(name = "timestamps", type = "LONG"), + @IntermediateState(name = "values", type = "TDIGEST"), + @IntermediateState(name = "seen", type = "BOOLEAN") } +) +@GroupingAggregator +public class FirstTDigestByTimestampAggregator { + public static String describe() { + return "first_TDigest_by_timestamp"; + } + + public static TDigestStates.WithLongSingleState initSingle(DriverContext driverContext) { + return new TDigestStates.WithLongSingleState(driverContext.breaker()); + } + + public static void combine(TDigestStates.WithLongSingleState current, TDigestHolder tdigest, long timestamp) { + if (current.isSeen() == false || timestamp < current.longValue()) { + current.set(timestamp, tdigest); + } + } + + public static void combineIntermediate(TDigestStates.WithLongSingleState current, long timestamp, TDigestHolder tdigest, boolean seen) { + if (seen) { + if (current.isSeen()) { + combine(current, tdigest, timestamp); + } else { + current.set(timestamp, tdigest); + } + } + } + + public static Block evaluateFinal(TDigestStates.WithLongSingleState current, DriverContext ctx) { + return current.evaluateFinalTDigest(ctx); + } + + public static TDigestStates.WithLongGroupingState initGrouping(DriverContext driverContext) { + return new TDigestStates.WithLongGroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(TDigestStates.WithLongGroupingState current, int groupId, TDigestHolder tdigest, long timestamp) { + if (current.seen(groupId) == false || timestamp < current.longValue(groupId)) { + current.set(groupId, timestamp, tdigest); + } + } + + public static void combineIntermediate( + TDigestStates.WithLongGroupingState current, + int groupId, + long timestamp, + TDigestHolder tdigest, + boolean seen + ) { + if (seen) { + combine(current, groupId, tdigest, timestamp); + } + } + + public static Block evaluateFinal( + TDigestStates.WithLongGroupingState state, + IntVector selected, + GroupingAggregatorEvaluationContext ctx + ) { + return state.evaluateFinalTDigests(selected, ctx.driverContext()); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/LastExponentialHistogramByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/LastExponentialHistogramByTimestampAggregator.java index 1be309147d450..d19c1fc3492e4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/LastExponentialHistogramByTimestampAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/LastExponentialHistogramByTimestampAggregator.java @@ -36,7 +36,7 @@ public static ExponentialHistogramStates.WithLongSingleState initSingle(DriverCo } public static void combine(ExponentialHistogramStates.WithLongSingleState current, ExponentialHistogram value, long timestamp) { - if (timestamp > current.longValue()) { + if (current.isSeen() == false || timestamp > current.longValue()) { current.set(timestamp, value); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/LastTDigestByTimestampAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/LastTDigestByTimestampAggregator.java new file mode 100644 index 0000000000000..bb13cd6b174e8 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/LastTDigestByTimestampAggregator.java @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.elasticsearch.compute.ann.Aggregator; +import org.elasticsearch.compute.ann.GroupingAggregator; +import org.elasticsearch.compute.ann.IntermediateState; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.TDigestHolder; +import org.elasticsearch.compute.operator.DriverContext; + +@Aggregator( + { + @IntermediateState(name = "timestamps", type = "LONG"), + @IntermediateState(name = "values", type = "TDIGEST"), + @IntermediateState(name = "seen", type = "BOOLEAN") } +) +@GroupingAggregator +public class LastTDigestByTimestampAggregator { + public static String describe() { + return "last_TDigest_by_timestamp"; + } + + public static TDigestStates.WithLongSingleState initSingle(DriverContext driverContext) { + return new TDigestStates.WithLongSingleState(driverContext.breaker()); + } + + public static void combine(TDigestStates.WithLongSingleState current, TDigestHolder tdigest, long timestamp) { + if (current.isSeen() == false || timestamp > current.longValue()) { + current.set(timestamp, tdigest); + } + } + + public static void combineIntermediate(TDigestStates.WithLongSingleState current, long timestamp, TDigestHolder tdigest, boolean seen) { + if (seen) { + if (current.isSeen()) { + combine(current, tdigest, timestamp); + } else { + current.set(timestamp, tdigest); + } + } + } + + public static Block evaluateFinal(TDigestStates.WithLongSingleState current, DriverContext ctx) { + return current.evaluateFinalTDigest(ctx); + } + + public static TDigestStates.WithLongGroupingState initGrouping(DriverContext driverContext) { + return new TDigestStates.WithLongGroupingState(driverContext.bigArrays(), driverContext.breaker()); + } + + public static void combine(TDigestStates.WithLongGroupingState current, int groupId, TDigestHolder tdigest, long timestamp) { + if (current.seen(groupId) == false || timestamp > current.longValue(groupId)) { + current.set(groupId, timestamp, tdigest); + } + } + + public static void combineIntermediate( + TDigestStates.WithLongGroupingState current, + int groupId, + long timestamp, + TDigestHolder value, + boolean seen + ) { + if (seen) { + combine(current, groupId, value, timestamp); + } + } + + public static Block evaluateFinal( + TDigestStates.WithLongGroupingState state, + IntVector selected, + GroupingAggregatorEvaluationContext ctx + ) { + return state.evaluateFinalTDigests(selected, ctx.driverContext()); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TDigestStates.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TDigestStates.java index 4bb26816e8b49..faa64bd923a78 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TDigestStates.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/TDigestStates.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; @@ -19,7 +20,6 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.aggregations.metrics.MemoryTrackingTDigestArrays; -import org.elasticsearch.search.aggregations.metrics.TDigestExecutionHint; import org.elasticsearch.tdigest.TDigest; import java.util.function.DoubleBinaryOperator; @@ -27,7 +27,6 @@ public final class TDigestStates { // Currently we use the same defaults as for queryDSL, we might make this configurable later - private static final TDigestExecutionHint EXECUTION_HINT = TDigestExecutionHint.DEFAULT; public static final double COMPRESSION = 100.0; private TDigestStates() {} @@ -243,4 +242,179 @@ public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { // noop - we handle the null states inside `toIntermediate` and `evaluateFinal` } } + + /** + * A state consisting of a single {@code long} value with a {@link TDigestHolder}. + * The intermediate state contains three values in order: the long, the digest, and a boolean specifying if a value was set or not. + */ + public static final class WithLongSingleState implements AggregatorState { + + private final CircuitBreaker breaker; + private long longValue; + private BreakingTDigestHolder value; + + public WithLongSingleState(CircuitBreaker breaker) { + this.breaker = breaker; + } + + public boolean isSeen() { + return value != null; + } + + public long longValue() { + assert isSeen(); + return longValue; + } + + public void set(long longValue, TDigestHolder digestValue) { + assert digestValue != null; + this.longValue = longValue; + if (value == null) { + value = BreakingTDigestHolder.create(breaker); + } + value.set(digestValue); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, DriverContext driverContext) { + assert blocks.length >= offset + 3; + BlockFactory blockFactory = driverContext.blockFactory(); + // in case of error, the blocks are closed by the caller + if (value == null) { + blocks[offset] = blockFactory.newConstantLongBlockWith(0L, 1); + blocks[offset + 1] = blockFactory.newConstantTDigestBlock(TDigestHolder.empty(), 1); + blocks[offset + 2] = blockFactory.newConstantBooleanBlockWith(false, 1); + } else { + blocks[offset] = blockFactory.newConstantLongBlockWith(longValue, 1); + blocks[offset + 1] = blockFactory.newConstantTDigestBlock(value.accessor(), 1); + blocks[offset + 2] = blockFactory.newConstantBooleanBlockWith(true, 1); + } + } + + public Block evaluateFinalTDigest(DriverContext driverContext) { + BlockFactory blockFactory = driverContext.blockFactory(); + if (value == null) { + return blockFactory.newConstantNullBlock(1); + } else { + return blockFactory.newConstantTDigestBlock(value.accessor(), 1); + } + } + + @Override + public void close() { + Releasables.close(value); + value = null; + } + } + + /** + * A grouping state consisting of a single {@code long} value with a {@link TDigestHolder} per group. + * The intermediate state contains three values in order: the long, the digest, and a boolean specifying if a value was set or not. + */ + public static final class WithLongGroupingState implements GroupingAggregatorState { + private LongArray longValues; + private ObjectArray values; + private final CircuitBreaker breaker; + private final BigArrays bigArrays; + + WithLongGroupingState(BigArrays bigArrays, CircuitBreaker breaker) { + LongArray longValues = null; + ObjectArray values = null; + boolean success = false; + try { + longValues = bigArrays.newLongArray(1); + values = bigArrays.newObjectArray(1); + success = true; + } finally { + if (success == false) { + Releasables.close(values, longValues); + } + } + this.longValues = longValues; + this.values = values; + this.bigArrays = bigArrays; + this.breaker = breaker; + } + + public void set(int groupId, long longValue, TDigestHolder digestValue) { + assert digestValue != null; + ensureCapacity(groupId); + BreakingTDigestHolder value = values.get(groupId); + if (value == null) { + value = BreakingTDigestHolder.create(breaker); + values.set(groupId, value); + } + value.set(digestValue); + longValues.set(groupId, longValue); + } + + private void ensureCapacity(int groupId) { + values = bigArrays.grow(values, groupId + 1); + longValues = bigArrays.grow(longValues, groupId + 1); + } + + @Override + public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { + assert blocks.length >= offset + 3; + try ( + var longBuilder = driverContext.blockFactory().newLongVectorFixedBuilder(selected.getPositionCount()); + var valueBuilder = driverContext.blockFactory().newTDigestBlockBuilder(selected.getPositionCount()); + var seenBuilder = driverContext.blockFactory().newBooleanVectorFixedBuilder(selected.getPositionCount()); + ) { + for (int i = 0; i < selected.getPositionCount(); i++) { + int groupId = selected.getInt(i); + if (seen(groupId)) { + seenBuilder.appendBoolean(true); + longBuilder.appendLong(longValues.get(groupId)); + valueBuilder.appendTDigest(values.get(groupId).accessor()); + } else { + seenBuilder.appendBoolean(false); + longBuilder.appendLong(0L); + valueBuilder.appendTDigest(TDigestHolder.empty()); + } + } + blocks[offset] = longBuilder.build().asBlock(); + blocks[offset + 1] = valueBuilder.build(); + blocks[offset + 2] = seenBuilder.build().asBlock(); + } + } + + public boolean seen(int groupId) { + return groupId < values.size() && values.get(groupId) != null; + } + + public long longValue(int groupId) { + assert seen(groupId); + return longValues.get(groupId); + } + + @Override + public void close() { + for (int i = 0; i < values.size(); i++) { + Releasables.close(values.get(i)); + } + Releasables.close(values, longValues); + values = null; + longValues = null; + } + + public Block evaluateFinalTDigests(IntVector selected, DriverContext driverContext) { + try (var builder = driverContext.blockFactory().newTDigestBlockBuilder(selected.getPositionCount())) { + for (int i = 0; i < selected.getPositionCount(); i++) { + int groupId = selected.getInt(i); + if (seen(groupId)) { + builder.appendTDigest(values.get(groupId).accessor()); + } else { + builder.appendNull(); + } + } + return builder.build(); + } + } + + @Override + public void enableGroupIdTracking(SeenGroupIds seenGroupIds) { + // noop + } + } } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/histogram.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/histogram.csv-spec index e089e5447897e..d3abac3eec213 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/histogram.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/histogram.csv-spec @@ -489,6 +489,24 @@ time:datetime | count:long // ------------------------------------------------------------------------------- +Histogram converted to tdigest in first_over_time and last_over_time +required_capability: ts_command_v0 +required_capability: tdigest_first_last_over_time + +TS histogram_timeseries_index + | WHERE NOT STARTS_WITH(instance, "hand-rolled") + | STATS first_count = COUNT(first_over_time(to_tdigest(responseTime))), + last_count = COUNT(last_over_time(to_tdigest(responseTime))) + BY time = TBUCKET(1d) + | KEEP time, first_count, last_count +; + +time:datetime | first_count:long | last_count:long +2025-09-25T00:00:00.000Z | 135 | 142 +; + +// ------------------------------------------------------------------------------- + // this specifically tests the interaction with PushCountQueryAndTagsToSource rule Top level date group with no other groups (timeseries index) required_capability: ts_command_v0 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec index 3b7b7e7b74334..c3a3eb55d586d 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec @@ -660,3 +660,22 @@ time:datetime | count:long 2025-09-25T00:50:00.000Z | 1451 2025-09-25T01:00:00.000Z | 147 ; + +// ------------------------------------------------------------------------------- + +timeseriesLastAndFirstOverTimeDirectTDigest +required_capability: ts_command_v0 +required_capability: tdigest_first_last_over_time + +TS tdigest_timeseries_index + | WHERE NOT STARTS_WITH(instance, "hand-rolled") + | STATS first_count = COUNT(first_over_time(responseTime)), + last_count = COUNT(last_over_time(responseTime)) + BY time = TBUCKET(1d) + | KEEP time, first_count, last_count + | SORT time +; + +time:datetime | first_count:long | last_count:long +2025-09-25T00:00:00.000Z | 135 | 142 +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 339901b9c58a0..d317be1067cc9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -2113,6 +2113,11 @@ public enum Cap { */ TDIGEST_MEDIAN, + /** + * Support for {@code FIRST_OVER_TIME} and {@code LAST_OVER_TIME} on {@code tdigest} type fields. + */ + TDIGEST_FIRST_LAST_OVER_TIME, + /** * A bugfix we applied to the HISTOGRAM_PERCENTILE algorithm on the tdigest type. * We previously were using hybrid-digests by accident and now use a merging digest. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java index 7a96f2cd3faf5..9d946e2c4ab42 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java @@ -15,6 +15,7 @@ import org.elasticsearch.compute.aggregation.FirstFloatByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.FirstIntByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.FirstLongByTimestampAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.FirstTDigestByTimestampAggregatorFunctionSupplier; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Literal; @@ -52,7 +53,7 @@ public class FirstOverTime extends TimeSeriesAggregateFunction implements Option // TODO: support all types @FunctionInfo( type = FunctionType.TIME_SERIES_AGGREGATE, - returnType = { "long", "integer", "double", "exponential_histogram" }, + returnType = { "long", "integer", "double", "exponential_histogram", "tdigest" }, description = "Calculates the earliest value of a field, where recency determined by the `@timestamp` field.", appliesTo = { @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.PREVIEW, version = "9.2.0") }, preview = true, @@ -62,7 +63,7 @@ public FirstOverTime( Source source, @Param( name = "field", - type = { "counter_long", "counter_integer", "counter_double", "long", "integer", "double", "exponential_histogram" }, + type = { "counter_long", "counter_integer", "counter_double", "long", "integer", "double", "exponential_histogram", "tdigest" }, description = "the metric field to calculate the value for" ) Expression field, @Param( @@ -122,7 +123,8 @@ protected TypeResolution resolveType() { field(), dt -> (dt.noCounter().isNumeric() && dt != DataType.UNSIGNED_LONG) || dt == DataType.AGGREGATE_METRIC_DOUBLE - || dt == DataType.EXPONENTIAL_HISTOGRAM, + || dt == DataType.EXPONENTIAL_HISTOGRAM + || dt == DataType.TDIGEST, sourceText(), DEFAULT, "numeric except unsigned_long" @@ -142,6 +144,7 @@ public AggregatorFunctionSupplier supplier() { case DOUBLE, COUNTER_DOUBLE -> new FirstDoubleByTimestampAggregatorFunctionSupplier(); case FLOAT -> new FirstFloatByTimestampAggregatorFunctionSupplier(); case EXPONENTIAL_HISTOGRAM -> new FirstExponentialHistogramByTimestampAggregatorFunctionSupplier(); + case TDIGEST -> new FirstTDigestByTimestampAggregatorFunctionSupplier(); default -> throw EsqlIllegalArgumentException.illegalDataType(type); }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java index e98e93a6b1cb4..60e92fba23a9a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java @@ -16,6 +16,7 @@ import org.elasticsearch.compute.aggregation.LastFloatByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.LastIntByTimestampAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.LastLongByTimestampAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.LastTDigestByTimestampAggregatorFunctionSupplier; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Literal; @@ -53,7 +54,7 @@ public class LastOverTime extends TimeSeriesAggregateFunction implements Optiona // TODO: support all types @FunctionInfo( type = FunctionType.TIME_SERIES_AGGREGATE, - returnType = { "long", "integer", "double", "_tsid", "exponential_histogram" }, + returnType = { "long", "integer", "double", "_tsid", "exponential_histogram", "tdigest" }, description = "Calculates the latest value of a field, where recency determined by the `@timestamp` field.", appliesTo = { @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.PREVIEW, version = "9.2.0") }, preview = true, @@ -63,7 +64,16 @@ public LastOverTime( Source source, @Param( name = "field", - type = { "counter_long", "counter_integer", "counter_double", "long", "integer", "double", "_tsid", "exponential_histogram" }, + type = { + "counter_long", + "counter_integer", + "counter_double", + "long", + "integer", + "double", + "_tsid", + "exponential_histogram", + "tdigest" }, description = "the metric field to calculate the latest value for" ) Expression field, @Param( @@ -123,7 +133,8 @@ protected TypeResolution resolveType() { field(), dt -> (dt.noCounter().isNumeric() && dt != DataType.UNSIGNED_LONG) || dt == DataType.TSID_DATA_TYPE - || dt == DataType.EXPONENTIAL_HISTOGRAM, + || dt == DataType.EXPONENTIAL_HISTOGRAM + || dt == DataType.TDIGEST, sourceText(), DEFAULT, "numeric except unsigned_long" @@ -144,6 +155,7 @@ public AggregatorFunctionSupplier supplier() { case FLOAT -> new LastFloatByTimestampAggregatorFunctionSupplier(); case TSID_DATA_TYPE -> new LastBytesRefByTimestampAggregatorFunctionSupplier(); case EXPONENTIAL_HISTOGRAM -> new LastExponentialHistogramByTimestampAggregatorFunctionSupplier(); + case TDIGEST -> new LastTDigestByTimestampAggregatorFunctionSupplier(); default -> throw EsqlIllegalArgumentException.illegalDataType(type); }; } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTimeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTimeTests.java index d704e68e83cb4..af11841b999c7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTimeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTimeTests.java @@ -38,13 +38,15 @@ public FirstOverTimeTests(@Name("TestCase") Supplier @ParametersFactory public static Iterable parameters() { var suppliers = new ArrayList(); - FunctionAppliesTo histogramAppliesTo = appliesTo(FunctionAppliesToLifecycle.PREVIEW, "9.3.0", "", true); + FunctionAppliesTo expHistogramAppliesTo = appliesTo(FunctionAppliesToLifecycle.PREVIEW, "9.3.0", "", true); + FunctionAppliesTo tDigestAppliesTo = appliesTo(FunctionAppliesToLifecycle.PREVIEW, "9.4.0", "", true); var valuesSuppliers = List.of( MultiRowTestCaseSupplier.longCases(1, 1000, Long.MIN_VALUE, Long.MAX_VALUE, true), MultiRowTestCaseSupplier.intCases(1, 1000, Integer.MIN_VALUE, Integer.MAX_VALUE, true), MultiRowTestCaseSupplier.doubleCases(1, 1000, -Double.MAX_VALUE, Double.MAX_VALUE, true), - MultiRowTestCaseSupplier.exponentialHistogramCases(1, 100).stream().map(s -> s.withAppliesTo(histogramAppliesTo)).toList() + MultiRowTestCaseSupplier.exponentialHistogramCases(1, 100).stream().map(s -> s.withAppliesTo(expHistogramAppliesTo)).toList(), + MultiRowTestCaseSupplier.tdigestCases(1, 100).stream().map(s -> s.withAppliesTo(tDigestAppliesTo)).toList() ); for (List valuesSupplier : valuesSuppliers) { for (TestCaseSupplier.TypedDataSupplier fieldSupplier : valuesSupplier) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTimeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTimeTests.java index 191091f1e74ee..61241730abd91 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTimeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTimeTests.java @@ -37,14 +37,16 @@ public LastOverTimeTests(@Name("TestCase") Supplier t @ParametersFactory public static Iterable parameters() { var suppliers = new ArrayList(); - FunctionAppliesTo histogramAppliesTo = appliesTo(FunctionAppliesToLifecycle.PREVIEW, "9.3.0", "", true); + FunctionAppliesTo expHistogramAppliesTo = appliesTo(FunctionAppliesToLifecycle.PREVIEW, "9.3.0", "", true); + FunctionAppliesTo tDigestAppliesTo = appliesTo(FunctionAppliesToLifecycle.PREVIEW, "9.4.0", "", true); var valuesSuppliers = List.of( MultiRowTestCaseSupplier.longCases(1, 1000, Long.MIN_VALUE, Long.MAX_VALUE, true), MultiRowTestCaseSupplier.intCases(1, 1000, Integer.MIN_VALUE, Integer.MAX_VALUE, true), MultiRowTestCaseSupplier.doubleCases(1, 1000, -Double.MAX_VALUE, Double.MAX_VALUE, true), MultiRowTestCaseSupplier.tsidCases(1, 1000), - MultiRowTestCaseSupplier.exponentialHistogramCases(1, 100).stream().map(s -> s.withAppliesTo(histogramAppliesTo)).toList() + MultiRowTestCaseSupplier.exponentialHistogramCases(1, 100).stream().map(s -> s.withAppliesTo(expHistogramAppliesTo)).toList(), + MultiRowTestCaseSupplier.tdigestCases(1, 100).stream().map(s -> s.withAppliesTo(tDigestAppliesTo)).toList() ); for (List valuesSupplier : valuesSuppliers) { for (TestCaseSupplier.TypedDataSupplier fieldSupplier : valuesSupplier) {