Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions presto-docs/src/main/sphinx/functions/aggregate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,117 @@ where :math:`f(x)` is the partial density function of :math:`x`.
The function uses the stream summary data structure proposed in the paper
`Efficient computation of frequent and top-k elements in data streams <https://www.cse.ust.hk/~raywong/comp5331/References/EfficientComputationOfFrequentAndTop-kElementsInDataStreams.pdf>`_ by A.Metwally, D.Agrawal and A.Abbadi.

Reservoir Sample Functions
-------------------------------

Reservoir sample functions use a fixed sample size, as opposed to
:ref:`TABLESAMPLE <sql-tablesample>`. Fixed sample sizes always result in a
fixed total size while still guaranteeing that each record in dataset has an
equal probability of being chosen. See [Vitter1985]_.

.. function:: reservoir_sample(initial_sample: array(T), initial_processed_count: bigint, values_to_sample: T, desired_sample_size: int) -> row(processed_count: bigint, sample: array(T))

Computes a new reservoir sample given:

- ``initial_sample``: an initial sample array, or ``NULL`` if creating a new
sample.
- ``initial_processed_count``: the number of records processed to generate
the initial sample array. This should be 0 or ``NULL`` if
``initital_sample`` is ``NULL``.
- ``values_to_sample``: the column to sample from.
- ``desired_sample_size``: the size of reservoir sample.

The function outputs a single row type with two columns:

#. Processed count: The total number of rows the function sampled
from. It includes the total from the ``initial_processed_count``,
if provided.

#. Reservoir sample: An array with length equivalent to the minimum of
``desired_sample_size`` and the number of values in the
``values_to_sample`` argument.


.. code-block:: sql

WITH result as (
SELECT
reservoir_sample(NULL, 0, col, 5) as reservoir
FROM (
VALUES
1, 2, 3, 4, 5, 6, 7, 8, 9, 0
) as t(col)
)
SELECT
reservoir.processed_count, reservoir.sample
FROM result;

.. code-block:: none

processed_count | sample
-----------------+-----------------
10 | [1, 2, 8, 4, 5]

To merge older samples with new data, supply valid arguments to the
``initial_sample`` argument and ``initial_processed_count`` arguments.

.. code-block:: sql

WITH initial_sample as (
SELECT
reservoir_sample(NULL, 0, col, 3) as reservoir
FROM (
VALUES
0, 1, 2, 3, 4
) as t(col)
),
new_sample as (
SELECT
reservoir_sample(
(SELECT reservoir.sample FROM initial_sample),
(SELECT reservoir.processed_count FROM initial_sample),
col,
3
) as result
FROM (
VALUES
5, 6, 7, 8, 9
) as t(col)
)
SELECT
result.processed_count, result.sample
FROM new_sample;

.. code-block:: none

processed_count | sample
-----------------+-----------
10 | [8, 3, 2]

To sample an entire row of a table, use a ``ROW`` type input with
each subfield corresponding to the columns of the source table.

.. code-block:: sql

WITH result as (
SELECT
reservoir_sample(NULL, 0, CAST(row(idx, val) AS row(idx int, val varchar)), 2) as reservoir
FROM (
VALUES
(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')
) as t(idx, val)
)
SELECT
reservoir.processed_count, reservoir.sample
FROM result;

.. code-block:: none

processed_count | sample
-----------------+----------------------------------
5 | [{idx=1, val=a}, {idx=5, val=e}]



---------------------------

Expand All @@ -978,3 +1089,5 @@ where :math:`f(x)` is the partial density function of :math:`x`.

.. [Efraimidis2006] Efraimidis, Pavlos S.; Spirakis, Paul G. (2006-03-16). "Weighted random sampling with a reservoir".
Information Processing Letters. 97 (5): 181–185.

.. [Vitter1985] Vitter, Jeffrey S. "Random sampling with a reservoir." ACM Transactions on Mathematical Software (TOMS) 11.1 (1985): 37-57.
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/sql/select.rst
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ after the ``OFFSET`` clause::
4
(2 rows)

.. _sql-tablesample:

TABLESAMPLE
-----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import com.facebook.presto.operator.aggregation.noisyaggregation.NoisyApproximateSetSfmAggregationDefaultPrecision;
import com.facebook.presto.operator.aggregation.noisyaggregation.NoisyCountIfGaussianAggregation;
import com.facebook.presto.operator.aggregation.noisyaggregation.SfmSketchMergeAggregation;
import com.facebook.presto.operator.aggregation.reservoirsample.ReservoirSampleFunction;
import com.facebook.presto.operator.scalar.ArrayAllMatchFunction;
import com.facebook.presto.operator.scalar.ArrayAnyMatchFunction;
import com.facebook.presto.operator.scalar.ArrayCardinalityFunction;
Expand Down Expand Up @@ -970,7 +971,8 @@ private List<? extends SqlFunction> getBuildInFunctions(FeaturesConfig featuresC
.function(DISTINCT_TYPE_DISTINCT_FROM_OPERATOR)
.functions(DISTINCT_TYPE_HASH_CODE_OPERATOR, DISTINCT_TYPE_XX_HASH_64_OPERATOR)
.function(DISTINCT_TYPE_INDETERMINATE_OPERATOR)
.codegenScalars(MapFilterFunction.class);
.codegenScalars(MapFilterFunction.class)
.aggregate(ReservoirSampleFunction.class);

switch (featuresConfig.getRegexLibrary()) {
case JONI:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.aggregation.reservoirsample;
package com.facebook.presto.operator.aggregation.differentialentropy;

import io.airlift.slice.SizeOf;
import io.airlift.slice.SliceInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.operator.aggregation.differentialentropy;

import com.facebook.presto.operator.aggregation.reservoirsample.UnweightedDoubleReservoirSample;
import com.facebook.presto.spi.PrestoException;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.aggregation.reservoirsample;
package com.facebook.presto.operator.aggregation.differentialentropy;

import io.airlift.slice.SizeOf;
import io.airlift.slice.SliceInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.operator.aggregation.differentialentropy;

import com.facebook.presto.operator.aggregation.reservoirsample.WeightedDoubleReservoirSample;
import com.facebook.presto.spi.PrestoException;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.aggregation.reservoirsample;

import com.facebook.presto.common.array.ObjectBigArray;
import com.facebook.presto.operator.aggregation.state.AbstractGroupedAccumulatorState;
import org.openjdk.jol.info.ClassLayout;

import static java.util.Objects.requireNonNull;

public class GroupedReservoirSampleState
extends AbstractGroupedAccumulatorState
implements ReservoirSampleState
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(GroupedReservoirSampleState.class).instanceSize();
private final ObjectBigArray<ReservoirSample> samples = new ObjectBigArray<>();
private long size;

@Override
public long getEstimatedSize()
{
return INSTANCE_SIZE + size + samples.sizeOf();
}

@Override
public void ensureCapacity(long size)
{
samples.ensureCapacity(size);
}

@Override
public ReservoirSample get()
{
return samples.get(getGroupId());
}

@Override
public void set(ReservoirSample value)
{
requireNonNull(value, "value is null");
ReservoirSample previous = get();
if (previous != null) {
size -= previous.estimatedInMemorySize();
}

samples.set(getGroupId(), value);
size += value.estimatedInMemorySize();
}
}
Loading