Skip to content

Commit

Permalink
[FEAT] Approximate quantile aggregation (pulled into main) (#2179)
Browse files Browse the repository at this point in the history
Puts the finishing touches on #2076

---------

Co-authored-by: Maxime Petitjean <[email protected]>
Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
3 people authored May 2, 2024
1 parent 2e90b70 commit 99a0ac0
Show file tree
Hide file tree
Showing 30 changed files with 1,021 additions and 26 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ rayon = "1.7.0"
regex = "1.10.4"
rstest = "0.18.2"
serde_json = "1.0.116"
sketches-ddsketch = {version = "0.2.2", features = ["use_serde"]}
snafu = {version = "0.7.4", features = ["futures"]}
tokio = {version = "1.37.0", features = ["net", "time", "bytes", "process", "signal", "macros", "rt", "rt-multi-thread"]}
tokio-stream = {version = "0.1.14", features = ["fs"]}
Expand Down
1 change: 1 addition & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,7 @@ class PyExpr:
def if_else(self, if_true: PyExpr, if_false: PyExpr) -> PyExpr: ...
def count(self, mode: CountMode) -> PyExpr: ...
def sum(self) -> PyExpr: ...
def approx_percentiles(self, percentiles: float | list[float]) -> PyExpr: ...
def mean(self) -> PyExpr: ...
def min(self) -> PyExpr: ...
def max(self) -> PyExpr: ...
Expand Down
63 changes: 63 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,69 @@ def sum(self) -> Expression:
expr = self._expr.sum()
return Expression._from_pyexpr(expr)

def approx_percentiles(self, percentiles: builtins.float | builtins.list[builtins.float]) -> Expression:
"""Calculates the approximate percentile(s) for a column of numeric values
For numeric columns, we use the `sketches_ddsketch crate <https://docs.rs/sketches-ddsketch/latest/sketches_ddsketch/index.html>`_.
This is a Rust implementation of the paper `DDSketch: A Fast and Fully-Mergeable Quantile Sketch with Relative-Error Guarantees (Masson et al.) <https://arxiv.org/pdf/1908.10693>`_
1. Null values are ignored in the computation of the percentiles
2. If all values are Null then the result will also be Null
3. If ``percentiles`` are supplied as a single float, then the resultant column is a ``Float64`` column
4. If ``percentiles`` is supplied as a list, then the resultant column is a ``FixedSizeList[Float64; N]`` column, where ``N`` is the length of the supplied list.
Example of a global calculation of approximate percentiles:
>>> df = daft.from_pydict({"scores": [1, 2, 3, 4, 5, None]})
>>> df = df.agg(
>>> df["scores"].approx_percentiles(0.5).alias("approx_median_score"),
>>> df["scores"].approx_percentiles([0.25, 0.5, 0.75]).alias("approx_percentiles_scores"),
>>> )
>>> df.show()
╭─────────────────────┬────────────────────────────────╮
│ approx_median_score ┆ approx_percentiles_scores │
│ --- ┆ --- │
│ Float64 ┆ FixedSizeList[Float64; 3] │
╞═════════════════════╪════════════════════════════════╡
│ 2.9742334234767167 ┆ [1.993661701417351, 2.9742334… │
╰─────────────────────┴────────────────────────────────╯
(Showing first 1 of 1 rows)
Example of a grouped calculation of approximate percentiles:
>>> df = daft.from_pydict({
>>> "class": ["a", "a", "a", "b", "c"],
>>> "scores": [1, 2, 3, 1, None],
>>> })
>>> df = df.groupby("class").agg(
>>> df["scores"].approx_percentiles(0.5).alias("approx_median_score"),
>>> df["scores"].approx_percentiles([0.25, 0.5, 0.75]).alias("approx_percentiles_scores"),
>>> )
>>> df.show()
╭───────┬─────────────────────┬────────────────────────────────╮
│ class ┆ approx_median_score ┆ approx_percentiles_scores │
│ --- ┆ --- ┆ --- │
│ Utf8 ┆ Float64 ┆ FixedSizeList[Float64; 3] │
╞═══════╪═════════════════════╪════════════════════════════════╡
│ c ┆ None ┆ None │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ a ┆ 1.993661701417351 ┆ [0.9900000000000001, 1.993661… │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ b ┆ 0.9900000000000001 ┆ [0.9900000000000001, 0.990000… │
╰───────┴─────────────────────┴────────────────────────────────╯
(Showing first 3 of 3 rows)
Args:
percentiles: the percentile(s) at which to find approximate values at. Can be provided as a single
float or a list of floats.
Returns:
A new expression representing the approximate percentile(s). If `percentiles` was a single float, this will be a new `Float64` expression. If `percentiles` was a list of floats, this will be a new expression with type: `FixedSizeList[Float64, len(percentiles)]`.
"""
expr = self._expr.approx_percentiles(percentiles)
return Expression._from_pyexpr(expr)

def mean(self) -> Expression:
"""Calculates the mean of the values in the expression"""
expr = self._expr.mean()
Expand Down
1 change: 1 addition & 0 deletions docs/source/api_docs/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ The following can be used with DataFrame.agg or GroupedDataFrame.agg
Expression.any_value
Expression.agg_list
Expression.agg_concat
Expression.approx_percentiles

.. _expression-accessor-properties:
.. _api-string-expression-operations:
Expand Down
2 changes: 2 additions & 0 deletions src/daft-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ chrono = {workspace = true}
chrono-tz = {workspace = true}
comfy-table = {workspace = true}
common-error = {path = "../common/error", default-features = false}
daft-sketch = {path = "../daft-sketch", default-features = false}
dyn-clone = "1.0.17"
fnv = "1.0.7"
html-escape = {workspace = true}
Expand All @@ -27,6 +28,7 @@ rand = {workspace = true}
regex = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}
sketches-ddsketch = {workspace = true}

[dependencies.image]
default-features = false
Expand Down
1 change: 0 additions & 1 deletion src/daft-core/src/array/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::datatypes::{
};

use crate::array::DataArray;

use common_error::{DaftError, DaftResult};

impl<T: DaftNumericType> From<(&str, Box<arrow2::array::PrimitiveArray<T::Native>>)>
Expand Down
109 changes: 109 additions & 0 deletions src/daft-core/src/array/ops/approx_sketch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use super::as_arrow::AsArrow;
use super::from_arrow::FromArrow;
use super::DaftApproxSketchAggable;
use crate::array::ops::GroupIndices;
use crate::{array::StructArray, datatypes::*};
use arrow2::array::Array;
use common_error::DaftResult;
use sketches_ddsketch::{Config, DDSketch};

impl DaftApproxSketchAggable for &DataArray<Float64Type> {
type Output = DaftResult<StructArray>;

fn approx_sketch(&self) -> Self::Output {
let primitive_arr = self.as_arrow();
let arrow_array = if primitive_arr.is_empty() {
daft_sketch::into_arrow2(vec![])
} else if primitive_arr.null_count() > 0 {
let sketch = primitive_arr
.iter()
.fold(None, |acc, value| match (acc, value) {
(acc, None) => acc,
(None, Some(v)) => {
let mut sketch = DDSketch::new(Config::defaults());
sketch.add(*v);
Some(sketch)
}
(Some(mut acc), Some(v)) => {
acc.add(*v);
Some(acc)
}
});
daft_sketch::into_arrow2(vec![sketch])
} else {
let sketch = primitive_arr.values_iter().fold(
DDSketch::new(Config::defaults()),
|mut acc, value| {
acc.add(*value);
acc
},
);

daft_sketch::into_arrow2(vec![Some(sketch)])
};

StructArray::from_arrow(
Field::new(
&self.field.name,
DataType::from(&*daft_sketch::ARROW2_DDSKETCH_DTYPE),
)
.into(),
arrow_array,
)
}

fn grouped_approx_sketch(&self, groups: &GroupIndices) -> Self::Output {
let arrow_array = self.as_arrow();
let sketch_per_group = if arrow_array.is_empty() {
daft_sketch::into_arrow2(vec![])
} else if arrow_array.null_count() > 0 {
let sketches: Vec<Option<DDSketch>> = groups
.iter()
.map(|g| {
g.iter().fold(None, |acc, index| {
let idx = *index as usize;
match (acc, arrow_array.is_null(idx)) {
(acc, true) => acc,
(None, false) => {
let mut sketch = DDSketch::new(Config::defaults());
sketch.add(arrow_array.value(idx));
Some(sketch)
}
(Some(mut acc), false) => {
acc.add(arrow_array.value(idx));
Some(acc)
}
}
})
})
.collect();

daft_sketch::into_arrow2(sketches)
} else {
let sketches = groups
.iter()
.map(|g| {
Some(
g.iter()
.fold(DDSketch::new(Config::defaults()), |mut acc, index| {
let idx = *index as usize;
acc.add(arrow_array.value(idx));
acc
}),
)
})
.collect();

daft_sketch::into_arrow2(sketches)
};

StructArray::from_arrow(
Field::new(
&self.field.name,
DataType::from(&*daft_sketch::ARROW2_DDSKETCH_DTYPE),
)
.into(),
sketch_per_group,
)
}
}
74 changes: 74 additions & 0 deletions src/daft-core/src/array/ops/merge_sketch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use super::from_arrow::FromArrow;
use super::DaftMergeSketchAggable;
use crate::array::ops::GroupIndices;
use crate::{array::StructArray, datatypes::*};
use common_error::{DaftError, DaftResult};

impl DaftMergeSketchAggable for &StructArray {
type Output = DaftResult<StructArray>;

fn merge_sketch(&self) -> Self::Output {
let sketches_array = daft_sketch::from_arrow2(self.to_arrow())?;
let sketch =
sketches_array
.into_iter()
.try_fold(None, |acc, value| match (acc, value) {
(acc, None) => Ok::<_, DaftError>(acc),
(None, Some(v)) => Ok(Some(v)),
(Some(mut acc), Some(v)) => {
acc.merge(&v).map_err(|err| {
DaftError::ComputeError(format!("Error merging sketches: {}", err))
})?;
Ok(Some(acc))
}
})?;
let arrow_array = daft_sketch::into_arrow2(vec![sketch]);

StructArray::from_arrow(
Field::new(
&self.field.name,
DataType::from(&*daft_sketch::ARROW2_DDSKETCH_DTYPE),
)
.into(),
arrow_array,
)
}

fn grouped_merge_sketch(&self, groups: &GroupIndices) -> Self::Output {
let sketches_array = daft_sketch::from_arrow2(self.to_arrow())?;

let sketch_per_group = groups
.iter()
.map(|g| {
g.iter().try_fold(None, |acc, index| {
let idx = *index as usize;
match (acc, sketches_array[idx].is_none()) {
(acc, true) => Ok::<_, DaftError>(acc),
(None, false) => Ok(sketches_array[idx].clone()),
(Some(mut acc), false) => {
acc.merge(sketches_array[idx].as_ref().unwrap())
.map_err(|err| {
DaftError::ComputeError(format!(
"Error merging sketches: {}",
err
))
})?;
Ok(Some(acc))
}
}
})
})
.collect::<DaftResult<Vec<_>>>()?;

let arrow_array = daft_sketch::into_arrow2(sketch_per_group);

StructArray::from_arrow(
Field::new(
&self.field.name,
DataType::from(&*daft_sketch::ARROW2_DDSKETCH_DTYPE),
)
.into(),
arrow_array,
)
}
}
15 changes: 15 additions & 0 deletions src/daft-core/src/array/ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod abs;
mod apply;
mod approx_sketch;
mod arange;
mod arithmetic;
pub mod arrow2;
Expand Down Expand Up @@ -31,12 +32,14 @@ mod list;
mod list_agg;
mod log;
mod mean;
mod merge_sketch;
mod null;
mod pairwise;
mod repr;
mod round;
mod search_sorted;
mod sign;
mod sketch_percentile;
mod sort;
mod sqrt;
mod struct_;
Expand Down Expand Up @@ -128,6 +131,18 @@ pub trait DaftSumAggable {
fn grouped_sum(&self, groups: &GroupIndices) -> Self::Output;
}

pub trait DaftApproxSketchAggable {
type Output;
fn approx_sketch(&self) -> Self::Output;
fn grouped_approx_sketch(&self, groups: &GroupIndices) -> Self::Output;
}

pub trait DaftMergeSketchAggable {
type Output;
fn merge_sketch(&self) -> Self::Output;
fn grouped_merge_sketch(&self, groups: &GroupIndices) -> Self::Output;
}

pub trait DaftMeanAggable {
type Output;
fn mean(&self) -> Self::Output;
Expand Down
Loading

0 comments on commit 99a0ac0

Please sign in to comment.