Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Add quantile aggregate. #636

Merged
merged 4 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions omniscidb/IR/Expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ ExprPtr AggExpr::withType(const Type* new_type) const {
if (type_->equal(new_type)) {
return shared_from_this();
}
return makeExpr<AggExpr>(new_type, agg_type_, arg_, is_distinct_, arg1_);
return makeExpr<AggExpr>(
new_type, agg_type_, arg_, is_distinct_, arg1_, interpolation_);
}

ExprPtr CaseExpr::withType(const Type* new_type) const {
Expand Down Expand Up @@ -1193,7 +1194,8 @@ bool AggExpr::operator==(const Expr& rhs) const {
return false;
}
const AggExpr& rhs_ae = dynamic_cast<const AggExpr&>(rhs);
if (agg_type_ != rhs_ae.aggType() || is_distinct_ != rhs_ae.isDistinct()) {
if (agg_type_ != rhs_ae.aggType() || is_distinct_ != rhs_ae.isDistinct() ||
interpolation_ != rhs_ae.interpolation_) {
return false;
}
if (arg_.get() == rhs_ae.arg()) {
Expand Down Expand Up @@ -1598,6 +1600,9 @@ std::string AggExpr::toString() const {
if (arg1_) {
ss << arg1_->toString();
}
if (agg_type_ == AggType::kQuantile) {
ss << " " << interpolation_;
}
ss << ")";
return ss.str();
}
Expand Down Expand Up @@ -2059,6 +2064,9 @@ size_t AggExpr::hash() const {
if (arg1_) {
boost::hash_combine(*hash_, arg1_->hash());
}
if (agg_type_ == AggType::kQuantile) {
boost::hash_combine(*hash_, static_cast<int>(interpolation_));
}
}
return *hash_;
}
Expand Down
20 changes: 17 additions & 3 deletions omniscidb/IR/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -787,11 +787,22 @@ class LikelihoodExpr : public Expr {
*/
class AggExpr : public Expr {
public:
AggExpr(const Type* type, AggType a, ExprPtr arg, bool d, ExprPtr arg1)
: Expr(type, true), agg_type_(a), arg_(arg), is_distinct_(d), arg1_(arg1) {
AggExpr(const Type* type,
AggType a,
ExprPtr arg,
bool d,
ExprPtr arg1,
Interpolation interpolation = Interpolation::kLinear)
: Expr(type, true)
, agg_type_(a)
, arg_(arg)
, is_distinct_(d)
, arg1_(arg1)
, interpolation_(interpolation) {
if (arg1) {
if (agg_type_ == AggType::kApproxCountDistinct ||
agg_type_ == AggType::kApproxQuantile || agg_type_ == AggType::kTopK) {
agg_type_ == AggType::kApproxQuantile || agg_type_ == AggType::kTopK ||
agg_type_ == AggType::kQuantile) {
CHECK(arg1_->is<Constant>());
} else {
CHECK(agg_type_ == AggType::kCorr);
Expand All @@ -804,6 +815,7 @@ class AggExpr : public Expr {
bool isDistinct() const { return is_distinct_; }
const Expr* arg1() const { return arg1_.get(); }
ExprPtr arg1Shared() const { return arg1_; }
Interpolation interpolation() const { return interpolation_; }
ExprPtr withType(const Type* new_type) const override;
bool operator==(const Expr& rhs) const override;
std::string toString() const override;
Expand All @@ -817,6 +829,8 @@ class AggExpr : public Expr {
// APPROX_COUNT_DISTINCT error_rate, APPROX_QUANTILE quantile,
// CORR second arg
ExprPtr arg1_;
// QUANTILE interpolation
Interpolation interpolation_;
};

/*
Expand Down
8 changes: 6 additions & 2 deletions omniscidb/IR/ExprRewriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,12 @@ class ExprRewriter : public ExprVisitor<ExprPtr> {
ExprPtr new_arg = agg->arg() ? visit(agg->arg()) : nullptr;
ExprPtr new_arg1 = agg->arg1() ? visit(agg->arg1()) : nullptr;
if (new_arg.get() != agg->arg() || new_arg1.get() != agg->arg1()) {
return hdk::ir::makeExpr<hdk::ir::AggExpr>(
agg->type(), agg->aggType(), new_arg, agg->isDistinct(), new_arg1);
return hdk::ir::makeExpr<hdk::ir::AggExpr>(agg->type(),
agg->aggType(),
new_arg,
agg->isDistinct(),
new_arg1,
agg->interpolation());
}
return defaultResult(agg);
}
Expand Down
23 changes: 23 additions & 0 deletions omniscidb/IR/OpType.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ inline std::string toString(hdk::ir::AggType agg) {
return "SINGLE_VALUE";
case hdk::ir::AggType::kTopK:
return "TOP_K";
case hdk::ir::AggType::kQuantile:
return "QUANTILE";
case hdk::ir::AggType::kStdDevSamp:
return "STDDEV";
case hdk::ir::AggType::kCorr:
Expand Down Expand Up @@ -153,6 +155,23 @@ inline std::string toString(hdk::ir::WindowFunctionKind kind) {
return "";
}

inline std::string toString(hdk::ir::Interpolation interpolation) {
switch (interpolation) {
case hdk::ir::Interpolation::kLower:
return "LOWER";
case hdk::ir::Interpolation::kHigher:
return "HIGHER";
case hdk::ir::Interpolation::kNearest:
return "NEAREST";
case hdk::ir::Interpolation::kMidpoint:
return "MIDPOINT";
case hdk::ir::Interpolation::kLinear:
return "LINEAR";
}
LOG(FATAL) << "Invalid interpolation kind " << (int)interpolation;
return "";
}

namespace hdk::ir {

inline std::ostream& operator<<(std::ostream& os, hdk::ir::OpType op) {
Expand All @@ -171,4 +190,8 @@ inline std::ostream& operator<<(std::ostream& os, hdk::ir::WindowFunctionKind ki
return os << toString(kind);
}

inline std::ostream& operator<<(std::ostream& os, hdk::ir::Interpolation interpolation) {
return os << toString(interpolation);
}

} // namespace hdk::ir
3 changes: 3 additions & 0 deletions omniscidb/IR/OpTypeEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ enum class AggType {
kSample,
kSingleValue,
kTopK,
kQuantile,
// Compound aggregates
kStdDevSamp,
kCorr,
Expand All @@ -110,4 +111,6 @@ enum class WindowFunctionKind {
SumInternal // For deserialization from Calcite only. Gets rewritten to a regular SUM.
};

enum class Interpolation { kLower, kHigher, kNearest, kMidpoint, kLinear };

} // namespace hdk::ir
59 changes: 47 additions & 12 deletions omniscidb/QueryBuilder/QueryBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ BuilderExpr BuilderExpr::approxCountDist() const {

BuilderExpr BuilderExpr::approxQuantile(double val) const {
if (!expr_->type()->isNumber()) {
throw InvalidQueryError() << "Unsupported type for sum aggregate: "
throw InvalidQueryError() << "Unsupported type for ApproxQuantile aggregate: "
<< expr_->type()->toString();
}
if (val < 0.0 || val > 1.0) {
Expand Down Expand Up @@ -613,6 +613,33 @@ BuilderExpr BuilderExpr::bottomK(int count) const {
return topK(-count);
}

BuilderExpr BuilderExpr::quantile(double val, Interpolation interpolation) const {
if (!expr_->type()->isNumber() && !expr_->type()->isDateTime() &&
!expr_->type()->isInterval()) {
throw InvalidQueryError() << "Unsupported type for quantile aggregate: "
<< expr_->type()->toString();
}
if (val < 0.0 || val > 1.0) {
throw InvalidQueryError() << "Quantile expects argument between 0.0 and 1.0 but got "
<< val;
}
Datum d;
d.doubleval = val;
auto cst = makeExpr<Constant>(builder_->ctx_.fp64(), false, d);
auto res_type = expr_->type();
if (interpolation == Interpolation::kMidpoint ||
interpolation == Interpolation::kLinear) {
if (res_type->isInteger()) {
res_type = builder_->ctx_.fp64();
}
}
res_type = res_type->canonicalize();
auto agg =
makeExpr<AggExpr>(res_type, AggType::kQuantile, expr_, false, cst, interpolation);
auto name = name_.empty() ? "quantile" : name_ + "_quantile";
return {builder_, agg, name, true};
}

BuilderExpr BuilderExpr::stdDev() const {
if (!expr_->type()->isNumber()) {
throw InvalidQueryError() << "Non-numeric type " << expr_->type()->toString()
Expand Down Expand Up @@ -704,6 +731,7 @@ BuilderExpr BuilderExpr::agg(const std::string& agg_str, BuilderExpr arg) const
{"top_k", AggType::kTopK},
{"bottomk", AggType::kTopK},
{"bottom_k", AggType::kTopK},
{"quantile", AggType::kQuantile},
{"stddev", AggType::kStdDevSamp},
{"stddev_samp", AggType::kStdDevSamp},
{"stddev samp", AggType::kStdDevSamp},
Expand All @@ -716,8 +744,8 @@ BuilderExpr BuilderExpr::agg(const std::string& agg_str, BuilderExpr arg) const
}

auto kind = agg_names.at(agg_str_lower);
if (kind == AggType::kApproxQuantile && !arg.expr()) {
throw InvalidQueryError("Missing argument for approximate quantile aggregate.");
if ((kind == AggType::kApproxQuantile || kind == AggType::kQuantile) && !arg.expr()) {
throw InvalidQueryError("Missing argument for quantile aggregate.");
}
if (kind == AggType::kTopK) {
if (!arg.expr()) {
Expand Down Expand Up @@ -751,8 +779,10 @@ BuilderExpr BuilderExpr::agg(AggType agg_kind, const BuilderExpr& arg) const {
return agg(agg_kind, false, arg);
}

BuilderExpr BuilderExpr::agg(AggType agg_kind, double val) const {
return agg(agg_kind, false, val);
BuilderExpr BuilderExpr::agg(AggType agg_kind,
double val,
Interpolation interpolation) const {
return agg(agg_kind, false, val, interpolation);
}

BuilderExpr BuilderExpr::agg(AggType agg_kind, int val) const {
Expand All @@ -761,21 +791,21 @@ BuilderExpr BuilderExpr::agg(AggType agg_kind, int val) const {

BuilderExpr BuilderExpr::agg(AggType agg_kind,
bool is_distinct,
const BuilderExpr& arg) const {
const BuilderExpr& arg,
Interpolation interpolation) const {
if (is_distinct && agg_kind != AggType::kCount) {
throw InvalidQueryError() << "Distinct property cannot be set to true for "
<< agg_kind << " aggregate.";
}
if (arg.expr() && agg_kind != AggType::kApproxQuantile && agg_kind != AggType::kCorr &&
agg_kind != AggType::kTopK) {
agg_kind != AggType::kTopK && agg_kind != AggType::kQuantile) {
throw InvalidQueryError() << "Aggregate argument is supported for approximate "
"quantile and corr only but provided for "
<< agg_kind;
}
if (agg_kind == AggType::kApproxQuantile) {
if (agg_kind == AggType::kApproxQuantile || agg_kind == AggType::kQuantile) {
if (!arg.expr()->is<Constant>() || !arg.type()->isFloatingPoint()) {
throw InvalidQueryError() << "Expected fp constant argumnt for approximate "
"quantile. Provided: "
throw InvalidQueryError() << "Expected fp constant argumnt for quantile. Provided: "
<< arg.expr()->toString();
}
}
Expand Down Expand Up @@ -808,6 +838,8 @@ BuilderExpr BuilderExpr::agg(AggType agg_kind,
return singleValue();
case AggType::kTopK:
return topK(arg.expr()->as<Constant>()->intVal());
case AggType::kQuantile:
return quantile(arg.expr()->as<Constant>()->fpVal(), interpolation);
case AggType::kStdDevSamp:
return stdDev();
case AggType::kCorr:
Expand All @@ -818,12 +850,15 @@ BuilderExpr BuilderExpr::agg(AggType agg_kind,
throw InvalidQueryError() << "Unsupported aggregate type: " << agg_kind;
}

BuilderExpr BuilderExpr::agg(AggType agg_kind, bool is_distinct, double val) const {
BuilderExpr BuilderExpr::agg(AggType agg_kind,
bool is_distinct,
double val,
Interpolation interpolation) const {
BuilderExpr arg;
if (val != HUGE_VAL) {
arg = builder_->cst(val);
}
return agg(agg_kind, is_distinct, arg);
return agg(agg_kind, is_distinct, arg, interpolation);
}

BuilderExpr BuilderExpr::extract(DateExtractField field) const {
Expand Down
14 changes: 11 additions & 3 deletions omniscidb/QueryBuilder/QueryBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class BuilderExpr {
BuilderExpr singleValue() const;
BuilderExpr topK(int count) const;
BuilderExpr bottomK(int count) const;
BuilderExpr quantile(double val,
Interpolation interpolation = Interpolation::kLinear) const;
BuilderExpr stdDev() const;
BuilderExpr corr(const BuilderExpr& arg) const;

Expand All @@ -90,12 +92,18 @@ class BuilderExpr {
BuilderExpr agg(const std::string& agg_str, double val = HUGE_VAL) const;
BuilderExpr agg(const std::string& agg_str, int val) const;
BuilderExpr agg(AggType agg_kind, const BuilderExpr& arg) const;
BuilderExpr agg(AggType agg_kind, double val) const;
BuilderExpr agg(AggType agg_kind,
double val,
Interpolation interpolation = Interpolation::kLinear) const;
BuilderExpr agg(AggType agg_kind, int val) const;
BuilderExpr agg(AggType agg_kind, bool is_dinstinct, const BuilderExpr& arg) const;
BuilderExpr agg(AggType agg_kind,
bool is_dinstinct,
const BuilderExpr& arg,
Interpolation interpolation = Interpolation::kLinear) const;
BuilderExpr agg(AggType agg_kind,
bool is_dinstinct = false,
double val = HUGE_VAL) const;
double val = HUGE_VAL,
Interpolation interpolation = Interpolation::kLinear) const;

BuilderExpr extract(DateExtractField field) const;
BuilderExpr extract(const std::string& field) const;
Expand Down
3 changes: 3 additions & 0 deletions omniscidb/QueryEngine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ set(query_engine_source_files
OutputBufferInitialization.cpp
QueryPhysicalInputsCollector.cpp
PlanState.cpp
QuantileRuntime.cpp
QueryRewrite.cpp
QueryTemplateGenerator.cpp
QueryExecutionContext.cpp
Expand Down Expand Up @@ -136,6 +137,7 @@ set(group_by_hash_test_files
GroupByHashTest.cpp
MurmurHash.cpp
DynamicWatchdog.cpp
QuantileRuntime.cpp
RuntimeFunctions.cpp
)

Expand Down Expand Up @@ -182,6 +184,7 @@ set(hdk_default_runtime_functions_module_dependencies
DecodersImpl.h
${CMAKE_CURRENT_SOURCE_DIR}/../Utils/ExtractFromTime.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../Utils/StringLike.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../Shared/quantile.h
GroupByRuntime.cpp
TopKRuntime.cpp)

Expand Down
11 changes: 10 additions & 1 deletion omniscidb/QueryEngine/CalciteDeserializerUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

const hdk::ir::Type* get_agg_type(hdk::ir::AggType agg_kind,
const hdk::ir::Expr* arg_expr,
bool bigint_count) {
bool bigint_count,
hdk::ir::Interpolation interpolation) {
auto& ctx = arg_expr ? arg_expr->type()->ctx() : hdk::ir::Context::defaultCtx();
switch (agg_kind) {
case hdk::ir::AggType::kCount:
Expand All @@ -51,6 +52,14 @@ const hdk::ir::Type* get_agg_type(hdk::ir::AggType agg_kind,
return arg_expr->type();
case hdk::ir::AggType::kTopK:
return ctx.arrayVarLen(arg_expr->type(), 4, false);
case hdk::ir::AggType::kQuantile:
if (interpolation == hdk::ir::Interpolation::kMidpoint ||
interpolation == hdk::ir::Interpolation::kLinear) {
if (arg_expr->type()->isInteger()) {
return ctx.fp64();
}
}
return arg_expr->type()->canonicalize();
default:
CHECK(false);
}
Expand Down
Loading