Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add quantile_tdigest_weighted agg func #13400

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
---
title: QUANTILE_TDIGEST_WEIGHTED
---
import FunctionDescription from '@site/src/components/FunctionDescription';

<FunctionDescription description="Introduced or updated: v1.2.174"/>

Computes an approximate quantile of a numeric data sequence using the [t-digest](https://github.com/tdunning/t-digest/blob/master/docs/t-digest-paper/histo.pdf) algorithm.
This function takes into account the weight of each sequence member. Memory consumption is **log(n)**, where **n** is a number of values.

:::caution
NULL values are not included in the calculation.
:::

## Syntax

```sql
QUANTILE_TDIGEST_WEIGHTED(<level1>[, <level2>, ...])(<expr>, <weight_expr>)
```

## Arguments

| Arguments | Description |
|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| `<level n>` | A level of quantile represents a constant floating-point number ranging from 0 to 1. It is recommended to use a level value in the range of [0.01, 0.99]. |
| `<expr>` | Any numerical expression |
| `<weight_expr>` | Any unsigned integer expression. Weight is a number of value occurrences. |

## Return Type

Returns either a Float64 value or an array of Float64 values, depending on the number of quantile levels specified.

## Example

```sql
-- Create a table and insert sample data
CREATE TABLE sales_data (
id INT,
sales_person_id INT,
sales_amount FLOAT
);

INSERT INTO sales_data (id, sales_person_id, sales_amount)
VALUES (1, 1, 5000),
(2, 2, 5500),
(3, 3, 6000),
(4, 4, 6500),
(5, 5, 7000);

SELECT QUANTILE_TDIGEST_WEIGHTED(0.5)(sales_amount, 1) AS median_sales_amount
FROM sales_data;

median_sales_amount|
-------------------+
6000.0|

SELECT QUANTILE_TDIGEST_WEIGHTED(0.5, 0.8)(sales_amount, 1)
FROM sales_data;

quantile_tdigest_weighted(0.5, 0.8)(sales_amount)|
-------------------------------------------------+
[6000.0,7000.0] |
```
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import FunctionDescription from '@site/src/components/FunctionDescription';

Computes an approximate quantile of a numeric data sequence using the [t-digest](https://github.com/tdunning/t-digest/blob/master/docs/t-digest-paper/histo.pdf) algorithm.

:::note
:::caution
NULL values are not included in the calculation.
:::

Expand Down
69 changes: 35 additions & 34 deletions docs/doc/15-sql-functions/10-aggregate-functions/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,38 @@ Aggregate functions are essential tools in SQL that allow you to perform calcula

These functions help you extract and summarize data from databases to gain valuable insights.

| Function Name | What It Does |
|-------------------------------------------------------------|---------------------------------------------------------------------------|
| [ANY](aggregate-any.md) | Checks if any row meets the specified condition |
| [APPROX_COUNT_DISTINCT](aggregate-approx-count-distinct.md) | Estimates the number of distinct values with HyperLogLog |
| [ARG_MAX](aggregate-arg-max.md) | Finds the arg value for the maximum val value |
| [ARG_MIN](aggregate-arg-min.md) | Finds the arg value for the minimum val value |
| [AVG_IF](aggregate-avg-if.md) | Calculates the average for rows meeting a condition |
| [ARRAY_AGG](aggregate-array-agg.md) | Converts all the values of a column to an Array |
| [AVG](aggregate-avg.md) | Calculates the average value of a specific column |
| [COUNT_DISTINCT](aggregate-count-distinct.md) | Counts the number of distinct values in a column |
| [COUNT_IF](aggregate-count-if.md) | Counts rows meeting a specified condition |
| [COUNT](aggregate-count.md) | Counts the number of rows that meet certain criteria |
| [COVAR_POP](aggregate-covar-pop.md) | Returns the population covariance of a set of number pairs |
| [COVAR_SAMP](aggregate-covar-samp.md) | Returns the sample covariance of a set of number pairs |
| [GROUP_ARRAY_MOVING_AVG](aggregate-group-array-moving-avg.md) | Returns an array with elements calculates the moving average of input values |
| [GROUP_ARRAY_MOVING_SUM](aggregate-group-array-moving-sum.md) | Returns an array with elements calculates the moving sum of input values |
| [KURTOSIS](aggregate-kurtosis.md) | Calculates the excess kurtosis of a set of values |
| [MAX_IF](aggregate-max-if.md) | Finds the maximum value for rows meeting a condition |
| [MAX](aggregate-max.md) | Finds the largest value in a specific column |
| [MEDIAN](aggregate-median.md) | Calculates the median value of a specific column |
| [MEDIAN_TDIGEST](aggregate-median-tdigest.md) | Calculates the median value of a specific column using t-digest algorithm |
| [MIN_IF](aggregate-min-if.md) | Finds the minimum value for rows meeting a condition |
| [MIN](aggregate-min.md) | Finds the smallest value in a specific column |
| [QUANTILE_CONT](aggregate-quantile-cont.md) | Calculates the interpolated quantile for a specific column |
| [QUANTILE_DISC](aggregate-quantile-disc.md) | Calculates the quantile for a specific column |
| [QUANTILE_TDIGEST](aggregate-quantile-tdigest.md) | Calculates the quantile using t-digest algorithm |
| [RETENTION](aggregate-retention.md) | Calculates retention for a set of events |
| [SKEWNESS](aggregate-skewness.md) | Calculates the skewness of a set of values |
| [STDDEV_POP](aggregate-stddev-pop.md) | Calculates the population standard deviation of a column |
| [STDDEV_SAMP](aggregate-stddev-samp.md) | Calculates the sample standard deviation of a column |
| [STRING_AGG](aggregate-string-agg.md) | Converts all the non-NULL values to String, separated by the delimiter |
| [SUM_IF](aggregate-sum-if.md) | Adds up the values meeting a condition of a specific column |
| [SUM](aggregate-sum.md) | Adds up the values of a specific column |
| [WINDOW_FUNNEL](aggregate-windowfunnel.md) | Analyzes user behavior in a time-ordered sequence of events |
| Function Name | What It Does |
|---------------------------------------------------------------------|------------------------------------------------------------------------------|
| [ANY](aggregate-any.md) | Checks if any row meets the specified condition |
| [APPROX_COUNT_DISTINCT](aggregate-approx-count-distinct.md) | Estimates the number of distinct values with HyperLogLog |
| [ARG_MAX](aggregate-arg-max.md) | Finds the arg value for the maximum val value |
| [ARG_MIN](aggregate-arg-min.md) | Finds the arg value for the minimum val value |
| [AVG_IF](aggregate-avg-if.md) | Calculates the average for rows meeting a condition |
| [ARRAY_AGG](aggregate-array-agg.md) | Converts all the values of a column to an Array |
| [AVG](aggregate-avg.md) | Calculates the average value of a specific column |
| [COUNT_DISTINCT](aggregate-count-distinct.md) | Counts the number of distinct values in a column |
| [COUNT_IF](aggregate-count-if.md) | Counts rows meeting a specified condition |
| [COUNT](aggregate-count.md) | Counts the number of rows that meet certain criteria |
| [COVAR_POP](aggregate-covar-pop.md) | Returns the population covariance of a set of number pairs |
| [COVAR_SAMP](aggregate-covar-samp.md) | Returns the sample covariance of a set of number pairs |
| [GROUP_ARRAY_MOVING_AVG](aggregate-group-array-moving-avg.md) | Returns an array with elements calculates the moving average of input values |
| [GROUP_ARRAY_MOVING_SUM](aggregate-group-array-moving-sum.md) | Returns an array with elements calculates the moving sum of input values |
| [KURTOSIS](aggregate-kurtosis.md) | Calculates the excess kurtosis of a set of values |
| [MAX_IF](aggregate-max-if.md) | Finds the maximum value for rows meeting a condition |
| [MAX](aggregate-max.md) | Finds the largest value in a specific column |
| [MEDIAN](aggregate-median.md) | Calculates the median value of a specific column |
| [MEDIAN_TDIGEST](aggregate-median-tdigest.md) | Calculates the median value of a specific column using t-digest algorithm |
| [MIN_IF](aggregate-min-if.md) | Finds the minimum value for rows meeting a condition |
| [MIN](aggregate-min.md) | Finds the smallest value in a specific column |
| [QUANTILE_CONT](aggregate-quantile-cont.md) | Calculates the interpolated quantile for a specific column |
| [QUANTILE_DISC](aggregate-quantile-disc.md) | Calculates the quantile for a specific column |
| [QUANTILE_TDIGEST](aggregate-quantile-tdigest.md) | Calculates the quantile using t-digest algorithm |
| [QUANTILE_TDIGEST_WEIGHTED](aggregate-quantile-tdigest-weighted.md) | Calculates the quantile with weighted using t-digest algorithm |
| [RETENTION](aggregate-retention.md) | Calculates retention for a set of events |
| [SKEWNESS](aggregate-skewness.md) | Calculates the skewness of a set of values |
| [STDDEV_POP](aggregate-stddev-pop.md) | Calculates the population standard deviation of a column |
| [STDDEV_SAMP](aggregate-stddev-samp.md) | Calculates the sample standard deviation of a column |
| [STRING_AGG](aggregate-string-agg.md) | Converts all the non-NULL values to String, separated by the delimiter |
| [SUM_IF](aggregate-sum-if.md) | Adds up the values meeting a condition of a specific column |
| [SUM](aggregate-sum.md) | Adds up the values of a specific column |
| [WINDOW_FUNNEL](aggregate-windowfunnel.md) | Analyzes user behavior in a time-ordered sequence of events |
33 changes: 18 additions & 15 deletions src/query/functions/src/aggregates/aggregate_quantile_tdigest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ use crate::aggregates::AggregateFunctionRef;
use crate::aggregates::StateAddr;
use crate::BUILTIN_FUNCTIONS;

const MEDIAN: u8 = 0;
const QUANTILE: u8 = 1;
pub(crate) const MEDIAN: u8 = 0;
pub(crate) const QUANTILE: u8 = 1;

#[derive(Serialize, Deserialize)]
struct QuantileTDigestState {
pub(crate) struct QuantileTDigestState {
epsilon: u32,
max_centroids: usize,

Expand All @@ -67,7 +67,7 @@ struct QuantileTDigestState {
}

impl QuantileTDigestState {
fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
epsilon: 100u32,
max_centroids: 2048,
Expand All @@ -82,17 +82,17 @@ impl QuantileTDigestState {
}
}

fn add(&mut self, other: f64) {
pub(crate) fn add(&mut self, other: f64, weight: Option<u64>) {
if self.unmerged_weights.len() + self.weights.len() >= self.max_centroids - 1 {
self.compress();
}

self.unmerged_weights.push(1f64);
self.unmerged_weights.push(weight.unwrap_or(1) as f64);
self.unmerged_means.push(other);
self.unmerged_total_weight += 1f64;
}

fn merge(&mut self, rhs: &mut Self) -> Result<()> {
pub(crate) fn merge(&mut self, rhs: &mut Self) -> Result<()> {
if rhs.len() == 0 {
return Ok(());
}
Expand All @@ -107,7 +107,11 @@ impl QuantileTDigestState {
Ok(())
}

fn merge_result(&mut self, builder: &mut ColumnBuilder, levels: Vec<f64>) -> Result<()> {
pub(crate) fn merge_result(
&mut self,
builder: &mut ColumnBuilder,
levels: Vec<f64>,
) -> Result<()> {
if levels.len() > 1 {
let builder = match builder {
ColumnBuilder::Array(box b) => b,
Expand All @@ -126,7 +130,7 @@ impl QuantileTDigestState {
Ok(())
}

fn quantile(&mut self, level: f64) -> f64 {
pub(crate) fn quantile(&mut self, level: f64) -> f64 {
self.compress();
if self.weights.is_empty() {
return 0f64;
Expand Down Expand Up @@ -317,13 +321,13 @@ where T: Number + AsPrimitive<f64>
Some(bitmap) => {
for (value, is_valid) in column.iter().zip(bitmap.iter()) {
if is_valid {
state.add(value.as_());
state.add(value.as_(), None);
}
}
}
None => {
for value in column.iter() {
state.add(value.as_());
state.add(value.as_(), None);
}
}
}
Expand All @@ -335,7 +339,7 @@ where T: Number + AsPrimitive<f64>
let v = NumberType::<T>::index_column(&column, row);
if let Some(v) = v {
let state = place.get::<QuantileTDigestState>();
state.add(v.as_())
state.add(v.as_(), None)
}
Ok(())
}
Expand All @@ -350,8 +354,7 @@ where T: Number + AsPrimitive<f64>
column.iter().zip(places.iter()).for_each(|(v, place)| {
let addr = place.next(offset);
let state = addr.get::<QuantileTDigestState>();
let v = v.as_();
state.add(v)
state.add(v.as_(), None)
});
Ok(())
}
Expand Down Expand Up @@ -489,7 +492,7 @@ pub fn try_create_aggregate_quantile_tdigest_function<const TYPE: u8>(
}

_ => Err(ErrorCode::BadDataValueType(format!(
"{} does not support type '{:?}'",
"{} just support numeric type, but got '{:?}'",
display_name, arguments[0]
))),
})
Expand Down
Loading
Loading