Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/presto-cpp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Note: Presto C++ is in active development. See :doc:`Limitations </presto_cpp/li

presto_cpp/installation
presto_cpp/features
presto_cpp/functions
presto_cpp/sidecar
presto_cpp/limitations
presto_cpp/plugin
Expand Down
8 changes: 8 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/functions.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
********************
Presto C++ Functions
********************

.. toctree::
:maxdepth: 1

functions/sketch.rst
38 changes: 38 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/functions/sketch.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
================
Sketch Functions
================

Sketches are data structures that can approximately answer particular questions
about a dataset when full accuracy is not required. Approximate answers are
often faster and more efficient to compute than functions which result in full
accuracy.

Presto C++ provides support for computing some sketches available in the `Apache
DataSketches`_ library.

Theta Sketches
--------------

Theta sketches enable distinct value counting on datasets and also provide the
ability to perform set operations. For more information on Theta sketches,
please see the Apache Datasketches `Theta sketch documentation`_.

.. function:: sketch_theta(x) -> varbinary

Computes a theta sketch from an input dataset. The output from
this function can be used as an input to any of the other ``sketch_theta_*``
family of functions.

.. function:: sketch_theta_estimate(sketch) -> double

Returns the estimate of distinct values from the input sketch.

.. function:: sketch_theta_summary(sketch) -> row(estimate double, theta double, upper_bound_std double, lower_bound_std double, retained_entries int)

Returns a summary of the input sketch which includes the distinct values
estimate alongside other useful information such as the sketch theta
parameter, current error bounds corresponding to 1 standard deviation, and
the number of retained entries in the sketch.

.. _Apache DataSketches: https://datasketches.apache.org/
.. _Theta sketch documentation: https://datasketches.apache.org/docs/Theta/ThetaSketches.html#theta-sketch-framework
2 changes: 2 additions & 0 deletions presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ if(PRESTO_ENABLE_JWT)
add_compile_definitions(PRESTO_ENABLE_JWT)
endif()

find_package(DataSketches)

if("${MAX_LINK_JOBS}")
set_property(GLOBAL APPEND PROPERTY JOB_POOLS "presto_link_job_pool=${MAX_LINK_JOBS}")
else()
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ target_link_libraries(
presto_session_properties
presto_velox_plan_conversion
presto_hive_functions
presto_theta_sketch_functions
velox_abfs
velox_aggregates
velox_caching
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "presto_cpp/main/connectors/SystemConnector.h"
#include "presto_cpp/main/connectors/hive/functions/HiveFunctionRegistration.h"
#include "presto_cpp/main/functions/FunctionMetadata.h"
#include "presto_cpp/main/functions/theta_sketch/ThetaSketchRegistration.h"
#include "presto_cpp/main/http/HttpConstants.h"
#include "presto_cpp/main/http/filters/AccessLogFilter.h"
#include "presto_cpp/main/http/filters/HttpEndpointLatencyFilter.h"
Expand Down Expand Up @@ -1451,6 +1452,9 @@ void PrestoServer::registerFunctions() {
velox::connector::hasConnector("hive-hadoop2")) {
hive::functions::registerHiveNativeFunctions();
}

functions::aggregate::theta_sketch::registerAllThetaSketchFunctions(
prestoBuiltinFunctionPrefix_);
}

void PrestoServer::registerRemoteFunctions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_library(presto_function_metadata OBJECT FunctionMetadata.cpp)
target_link_libraries(presto_function_metadata presto_common velox_function_registry)

add_subdirectory(dynamic_registry)
add_subdirectory(theta_sketch)

if(PRESTO_ENABLE_REMOTE_FUNCTIONS)
add_subdirectory(remote)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(presto_theta_sketch_functions ThetaSketchAggregate.cpp ThetaSketchFunctions.cpp)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "presto_cpp/main/functions/theta_sketch/ThetaSketchRegistration.h"

#include "DataSketches/theta_sketch.hpp"
#include "DataSketches/theta_union.hpp"

#include "velox/exec/Aggregate.h"
#include "velox/exec/SimpleAggregateAdapter.h"
#include "velox/functions/prestosql/aggregates/AggregateNames.h"
#include "velox/type/HugeInt.h"

namespace facebook::presto::functions::aggregate {

namespace {

const char* const kThetaSketch = "sketch_theta";

template <typename T>
class ThetaSketchAggregate {
public:
// Type(s) of input vector(s) wrapped in Row.
using InputType = velox::Row<T>;

// Type of intermediate result
using IntermediateType = velox::Varbinary;

// Type of output vector.
using OutputType = velox::Varbinary;

static constexpr bool default_null_behavior_ = false;

static bool toIntermediate(
velox::exec::out_type<IntermediateType>& out,
velox::exec::optional_arg_type<T> in) {
if (in.has_value()) {
auto updateSketch = datasketches::update_theta_sketch::builder().build();
if constexpr (std::is_same_v<T, velox::int128_t>) {
updateSketch.update(std::to_string(in.value()));
} else if constexpr (
std::is_same_v<T, std::string> ||
std::is_same_v<T, velox::Varbinary>) {
const auto& strView = in.value();
updateSketch.update(std::string(strView.data(), strView.size()));
} else {
updateSketch.update(in.value());
}
datasketches::theta_union thetaUnion =
datasketches::theta_union::builder().build();
thetaUnion.update(updateSketch);
auto compactSketch = thetaUnion.get_result();
out.resize(compactSketch.get_serialized_size_bytes());
auto serializedBytes = compactSketch.serialize();
std::memcpy(out.data(), serializedBytes.data(), out.size());
}
return true;
}

struct AccumulatorType {
datasketches::theta_union thetaUnion =
datasketches::theta_union::builder().build();
datasketches::update_theta_sketch updateSketch =
datasketches::update_theta_sketch::builder().build();

AccumulatorType() = delete;

// Constructor used in initializeNewGroups().
explicit AccumulatorType(
velox::HashStringAllocator* /*allocator*/,
ThetaSketchAggregate* /*fn*/) {}

void updateUnion() {
thetaUnion.update(updateSketch);
updateSketch.reset();
}

// addInput expects one parameter of exec::arg_type<T> for each child-type T
// wrapped in InputType.
bool addInput(
velox::HashStringAllocator* /*allocator*/,
velox::exec::optional_arg_type<T> data) {
if (data.has_value()) {
if constexpr (std::is_same_v<T, velox::int128_t>) {
updateSketch.update(std::to_string(data.value()));
} else if constexpr (
std::is_same_v<T, std::string> ||
std::is_same_v<T, velox::Varbinary>) {
const auto& strView = data.value();
updateSketch.update(std::string(strView.data(), strView.size()));
} else {
updateSketch.update(data.value());
}
}
return true;
}

// combine expects one parameter of exec::arg_type<IntermediateType>.
bool combine(
velox::HashStringAllocator* /*allocator*/,
velox::exec::optional_arg_type<velox::Varbinary> other) {
if (other.has_value()) {
updateUnion();
auto compactSketch = datasketches::wrapped_compact_theta_sketch::wrap(
other->data(), other->size());
thetaUnion.update(compactSketch);
}
return true;
}

bool getResult(velox::exec::out_type<velox::Varbinary>& out) {
updateUnion();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems exactly as writeIntermediateResult. Can you abstract a common function for it ?

auto compactSketch = thetaUnion.get_result();
out.resize(compactSketch.get_serialized_size_bytes());
auto serializedBytes = compactSketch.serialize();
std::memcpy(out.data(), serializedBytes.data(), out.size());
return true;
}

bool writeFinalResult(
bool nonNullGroup,
velox::exec::out_type<velox::Varbinary>& out) {
return getResult(out);
}

bool writeIntermediateResult(
bool nonNullGroup,
velox::exec::out_type<velox::Varbinary>& out) {
return getResult(out);
}
};
};

} // namespace

velox::exec::AggregateRegistrationResult registerThetaSketchAggregate(
const std::string& prefix,
bool withCompanionFunctions,
bool overwrite) {
std::vector<std::shared_ptr<velox::exec::AggregateFunctionSignature>>
signatures;
std::string returnType = "varbinary";
std::string intermediateType = "varbinary";

for (const auto& inputType :
{"tinyint",
"smallint",
"integer",
"bigint",
"hugeint",
"real",
"double",
"varchar",
"date",
"timestamp"}) {
signatures.push_back(
velox::exec::AggregateFunctionSignatureBuilder()
.returnType(returnType)
.intermediateType(intermediateType)
.argumentType(inputType)
.build());
}
signatures.push_back(
velox::exec::AggregateFunctionSignatureBuilder()
.integerVariable("a_precision")
.integerVariable("a_scale")
.returnType(returnType)
.intermediateType(intermediateType)
.argumentType("DECIMAL(a_precision, a_scale)")
.build());
signatures.push_back(
velox::exec::AggregateFunctionSignatureBuilder()
.integerVariable("a_precision")
.integerVariable("a_scale")
.returnType(returnType)
.intermediateType(intermediateType)
.argumentType("DECIMAL(a_precision, a_scale)")
.argumentType("double")
.build());

auto name = prefix + kThetaSketch;

return velox::exec::registerAggregateFunction(
name,
std::move(signatures),
[name](
velox::core::AggregationNode::Step step,
const std::vector<velox::TypePtr>& argTypes,
const velox::TypePtr& resultType,
const velox::core::QueryConfig& /*config*/)
-> std::unique_ptr<velox::exec::Aggregate> {
VELOX_CHECK_LE(
argTypes.size(), 1, "{} takes at most one argument", name);
auto inputType = argTypes[0];
if (velox::exec::isRawInput(step)) {
switch (inputType->kind()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The formating of the lines seems off... PTAL.

case velox::TypeKind::TINYINT:
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<int8_t>>>(step, argTypes, resultType);
case velox::TypeKind::SMALLINT:
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<int16_t>>>(step, argTypes, resultType);
case velox::TypeKind::INTEGER:
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<int32_t>>>(step, argTypes, resultType);
case velox::TypeKind::BIGINT:
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<int64_t>>>(step, argTypes, resultType);
case velox::TypeKind::HUGEINT:
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<velox::int128_t>>>(
step, argTypes, resultType);
case velox::TypeKind::REAL:
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<float>>>(step, argTypes, resultType);
case velox::TypeKind::DOUBLE:
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<double>>>(step, argTypes, resultType);
case velox::TypeKind::VARCHAR:
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<std::string>>>(
step, argTypes, resultType);
case velox::TypeKind::TIMESTAMP:
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<typename velox::TypeTraits<
velox::TypeKind::TIMESTAMP>::NativeType>>>(
step, argTypes, resultType);
default:
VELOX_FAIL(
"Unknown input type for {} aggregation {}",
name,
inputType->kindName());
}
} else {
return std::make_unique<velox::exec::SimpleAggregateAdapter<
ThetaSketchAggregate<velox::Varbinary>>>(
step, argTypes, resultType);
}
},
withCompanionFunctions,
overwrite);
}

} // namespace facebook::presto::functions::aggregate
Loading
Loading