Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Document vector_selector pushdown functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesGuthrie committed Feb 11, 2022
1 parent c4a4e10 commit bb5899b
Showing 1 changed file with 262 additions and 10 deletions.
272 changes: 262 additions & 10 deletions src/aggregates/vector_selector.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,141 @@
//! # Vector Selector
//! The `vector_selector` aggregate implements Prometheus' `VectorSelector` processing.
//!
//! ## Instant Vector
//! A Prometheus `Instant Vector` is a set of time series, each with a single data point. The single
//! data point of each series is determined by looking back over a window of time (`lookback`) from
//! the reference point `t` and retrieving the most recent sample.
//!
//! As an example, we see that given an evaluation time `t`, we look back on the timeseries `ts`
//! over a window of size `lookback` for the most recent sample, in this case `e`.
//!
//! ```text
//! t
//! |
//! ts: a b c d e f g
//! ^
//! out: e
//! |---lookback----|
//! ```
//!
//! ## Range Queries
//!
//! This can be applied over a time range to determine a range of `Instant Vector`s, i.e. an
//! `Instant Vector` for multiple reference points `(t_1, t_2, ..., t_n)`.
//!
//! The range query is determined by a `start_time`, an `end_time`, a `bucket_width`, and
//! `lookback`. These dictate the times `t_i` at which we look back over the time series to find
//! the most recent sample.
//!
//! As an example, for the range [`t_1, t_3`], and `bucket_width`, each of the points in time `t_1`,
//! `t_2`, `t_3` is separated by `bucket_width`, and uses `lookback` to look back over the time
//! series and determine the most recent sample.
//!
//! ```text
//! t_1 t_2 t_3
//! |----bucket_width----| |
//! | | |
//! ts: a b c d e f g h i j k
//! ^---| ^ x--------------|
//! out: e j Ø
//! |---lookback---| |---lookback---| |---lookback---|
//! ```
//!
//! ## Vector Selector
//!
//! Both a single and a range of `Instant Vector`s can be obtained with the `Vector Selector`.
//!
//! # Usage from SQL
//!
//! The pseudo-SQL definition of the `vector_selector` aggregate is:
//!
//! ```sql
//! FUNCTION vector_selector(
//! start_time TIMESTAMPTZ
//! , end_time TIMESTAMPTZ,
//! , bucket_width BIGINT
//! , lookback BIGINT
//! , sample_time TIMESTAMPTZ
//! , sample_value DOUBLE PRECISION
//! )
//! RETURNS DOUBLE PRECISION[]
//! ```
//!
//! The parameters `start_time`, `end_time`, `bucket_width`, and `lookback` function as described
//! above. `bucket_width` and `lookback` are specified in milliseconds. The parameters `sample_time`
//! and `sample_value` are values in the underlying timeseries which is being aggregated over.
//!
//! The vector selector returns an array containing _only_ the sample values for each window, or
//! `NULL` if there was no sample present for a given window. Note: it does not return any
//! timestamps.
//!
//! Note: The `vector_selector` aggregate expects to be evaluated over time series data in the range
//! [`start_time` - `lookback`, `end_time`]. If any of the values of `sample_time` is _outside_ of
//! this range, the aggregate will raise a Postgres ERROR.
//!
//! ## Example SQL query
//!
//! First, we assume a table `test_table` with the following definition:
//!
//! ```sql
//! CREATE TABLE test_table(t TIMESTAMPTZ, v DOUBLE PRECISION);
//! ```
//!
//! We can then invoke the `vector_selector` aggregate on this table
//!
//! ```sql
//! SELECT
//! vector_selector(
//! '2000-01-02T15:00:00+00:00'::TIMESTAMPTZ
//! , '2000-01-02T15:10:00+00:00'::TIMESTAMPTZ
//! , 10 * 60 * 1000
//! , 10 * 60 * 1000
//! , t
//! , v)
//! FROM test_table
//! WHERE t >= '2000-01-02T15:00:00+00:00'::TIMESTAMPTZ - '10 minutes'::INTERVAL
//! AND t <= '2000-01-02T15:10:00+00:00'::TIMESTAMPTZ
//! ```
//!
//! Here we define that we want to determine the `Instant Vector`s for the range
//! [2000-01-02T15:00:00+00:00, 2000-01-02T15:10:00+00:00], in 10 minute buckets, and looking back
//! over 10 minutes of data. Note that we restrict the data being aggregated over to correspond to
//! the arguments which we have passed to `vector_selector`.
//!
//! If `test_table` contained the following values:
//! ```text
//! t | v
//! ------------------------+-----
//! 2000-01-02 15:00:00+00 | 0
//! 2000-01-02 15:05:00+00 | 10
//! ```
//!
//! The output of the above `vector_selector` query would be:
//!
//! ```text
//! vector_selector
//! -----------------
//! {0,20}
//! (1 row)
//! ```
//!
//! We could modify the above query to obtain a single `Instant Vector` for the timestamp
//! '2000-01-02T15:00:00+00:00' with:
//!
//! ```sql
//! SELECT
//! vector_selector(
//! '2000-01-02T15:00:00+00:00'::TIMESTAMPTZ
//! , '2000-01-02T15:00:00+00:00'::TIMESTAMPTZ
//! , 0
//! , 10 * 60 * 1000
//! , t
//! , v)
//! FROM test_table
//! WHERE t >= '2000-01-02T15:00:00+00:00'::TIMESTAMPTZ - '10 minutes'::INTERVAL
//! AND t <= '2000-01-02T15:00:00+00:00'::TIMESTAMPTZ
//! ```
//!
use pgx::*;

use pgx::error;
Expand All @@ -9,14 +147,8 @@ use serde::{Deserialize, Serialize};
use crate::palloc::{Inner, InternalAsValue, ToInternal};
use crate::raw::bytea;

// a vector selector aggregate has the same semantics as parse.VectorSelector processing
// in Prometheus. Namely, for all timestamps ts in the series:
// ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval
// we return the last sample.value such that the time sample.time <= ts and sample.time >= ts-lookback.
// if such a sample doesn't exist return NULL (None).
// thus, the vector selector returns a regular series of values corresponding to all the points in the
// ts series above.
// Note that for performance, this aggregate is parallel-izable, combinable, and does not expect ordered inputs.
/// Note that for performance, this aggregate is parallel-izable, combinable, and does not expect
/// ordered inputs.
#[allow(clippy::too_many_arguments)]
#[pg_extern(immutable, parallel_safe)]
pub fn vector_selector_transition(
Expand Down Expand Up @@ -209,7 +341,7 @@ impl VectorSelector {
|| self.lookback != other.lookback
|| self.elements.len() != other.elements.len()
{
error!("trying to combine incomptible vector selectors")
error!("trying to combine incompatible vector selectors")
}

for it in self.elements.iter_mut().zip(other.elements.iter()) {
Expand Down Expand Up @@ -348,6 +480,15 @@ mod tests {
);
}

/// ```text
/// t_1 t_2
/// |----bucket_width---|
/// | |
/// ts: a b c d e f g h i j k
/// ^ ^
/// out: a k
/// |------lookback-----|------lookback-----|
/// ```
#[pg_test]
fn test_vector_selector_bucket_and_lookback_size_exact_match_beginning_end() {
setup();
Expand All @@ -368,8 +509,92 @@ mod tests {
assert_eq!(result, vec![Some(0_f64), Some(100_f64)]);
}

/// ```text
/// t_1 t_2 t_3
/// |----bucket_width----|----bucket_width----|
/// | | |
/// ts: a b c d
/// ^-| x------------| ^---------|
/// out: b Ø d
/// |--lookback--| |--lookback--| |--lookback--|
/// ```
#[pg_test]
fn test_vector_selector_start_end_not_aligned_with_table_data() {
fn test_vector_selector_lookback_smaller_than_bucket_with_and_without_results_in_lookback() {
Spi::run(
r#"
CREATE TABLE gfv_test_table(t TIMESTAMPTZ, v DOUBLE PRECISION);
INSERT INTO gfv_test_table (t, v) VALUES
('2000-01-02T14:58:00+00:00',0),
('2000-01-02T14:59:00+00:00',10),
('2000-01-02T15:01:00+00:00',20),
('2000-01-02T15:09:00+00:00',30);
"#,
);
let result = Spi::get_one::<Vec<Option<f64>>>(
r#"
SELECT
vector_selector(
'2000-01-02T15:00:00+00:00'::TIMESTAMPTZ
, '2000-01-02T15:10:00+00:00'::TIMESTAMPTZ
, 5 * 60 * 1000
, 2 * 60 * 1000
, t
, v order by t)
FROM gfv_test_table
;"#,
)
.expect("SQL query failed");
assert_eq!(result, vec![Some(10_f64), None, Some(30_f64)]);
}

/// ```text
/// t_1 t_2 t_3 t_4
/// |-bucket_width-|-bucket_width-|-bucket_width-|
/// | | | |
/// ts: a b
/// ^---|--------------| ^----|--------------|
/// out: a a b b
/// |-------lookback-------|
/// |-------lookback-------|
/// |-------lookback-------|
/// ``` |-------lookback-------|
#[pg_test]
fn test_vector_selector_lookback_larger_than_bucket_width() {
setup();
let result = Spi::get_one::<Vec<Option<f64>>>(
r#"
SELECT
vector_selector(
'2000-01-02T15:00:30+00:00'::TIMESTAMPTZ
, '2000-01-02T15:08:00+00:00'::TIMESTAMPTZ
, 150 * 1000
, 180 * 1000
, t
, v order by t)
FROM gfv_test_table
WHERE t >= '2000-01-02T15:00:30+00:00'::TIMESTAMPTZ - '180 SECONDS'::INTERVAL
AND t <= '2000-01-02T15:08:00+00:00'::TIMESTAMPTZ
;"#,
)
.expect("SQL query failed");
assert_eq!(
result,
vec![Some(0_f64), Some(0_f64), Some(10_f64), Some(10_f64)]
);
}

/// ```text
/// t_1 t_2
/// |----bucket_width---|
/// | |
/// ts: a b c d e f g h i j k
/// x-----------------| ^
/// out: Ø j
/// |------lookback-----|------lookback-----|
/// ```
#[pg_test]
fn test_vector_selector_start_end_not_aligned_with_timeseries_data() {
setup();
let result = Spi::get_one::<Vec<Option<f64>>>(
r#"
Expand All @@ -388,6 +613,15 @@ mod tests {
assert_eq!(result, vec![None, Some(90_f64)]);
}

/// ```text
/// t_1 t_6
/// bucket_widths: |---|---|---|---|---|
/// | | | | | |
/// ts: a b c d e f g h i j k
/// ^ ^ ^ ^ ^ ^
/// out: a c e g i k
/// lookbacks: |---|---|---|---|---|---|
/// ```
#[pg_test]
fn test_vector_selector_smaller_bucket_and_lookback_size() {
setup();
Expand Down Expand Up @@ -418,6 +652,15 @@ mod tests {
);
}

/// ```text
/// t_1 t_6
/// bucket_widths: |---|---|---|---|---|
/// | | | | | |
/// ts: a b c d e f g h i j k
/// ^ ^ ^ ^ ^ ^
/// out: a c e g i k
/// lookbacks: |---|---|---|---|---|---|
/// ```
#[pg_test]
fn test_vector_selector_smaller_bucket_and_lookback_size_randomized_input() {
setup();
Expand Down Expand Up @@ -453,6 +696,15 @@ mod tests {
);
}

/// ```text
/// t_1 t_6
/// bucket_widths: |---|---|---|---|---|
/// | | | | | |
/// ts: a b c d e f g h i j k
/// ^ ^ ^ ^ ^ ^
/// out: a c e g i k
/// lookbacks: |---|---|---|---|---|---|
/// ```
#[pg_test]
fn test_vector_selector_parallel_execution_smaller_bucket_lookback() {
setup();
Expand Down

0 comments on commit bb5899b

Please sign in to comment.