Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@
import static org.apache.calcite.rex.RexWindowBounds.UNBOUNDED_PRECEDING;
import static org.apache.calcite.rex.RexWindowBounds.following;
import static org.apache.calcite.rex.RexWindowBounds.preceding;
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_POP_NULLABLE;
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_SAMP_NULLABLE;
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.VAR_POP_NULLABLE;
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.VAR_SAMP_NULLABLE;
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.TransferUserDefinedAggFunction;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
Expand All @@ -25,7 +20,6 @@
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.rex.RexWindowBound;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand All @@ -37,9 +31,8 @@
import org.opensearch.sql.ast.tree.Relation;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction;
import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.PPLFuncImpTable;

public interface PlanUtils {

Expand Down Expand Up @@ -232,56 +225,7 @@ static RelBuilder.AggCall makeAggCall(
boolean distinct,
RexNode field,
List<RexNode> argList) {
switch (functionName) {
case MAX:
return context.relBuilder.max(field);
case MIN:
return context.relBuilder.min(field);
case AVG:
return context.relBuilder.avg(distinct, null, field);
case COUNT:
return context.relBuilder.count(
distinct, null, field == null ? ImmutableList.of() : ImmutableList.of(field));
case SUM:
return context.relBuilder.sum(distinct, null, field);
// case MEAN:
// throw new UnsupportedOperationException("MEAN is not supported in PPL");
// case STDDEV:
// return context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV,
// field);
case VARSAMP:
return context.relBuilder.aggregateCall(VAR_SAMP_NULLABLE, field);
case VARPOP:
return context.relBuilder.aggregateCall(VAR_POP_NULLABLE, field);
case STDDEV_POP:
return context.relBuilder.aggregateCall(STDDEV_POP_NULLABLE, field);
case STDDEV_SAMP:
return context.relBuilder.aggregateCall(STDDEV_SAMP_NULLABLE, field);
// case PERCENTILE_APPROX:
// return
// context.relBuilder.aggregateCall(SqlStdOperatorTable.PERCENTILE_CONT, field);
case TAKE:
return TransferUserDefinedAggFunction(
TakeAggFunction.class,
"TAKE",
UserDefinedFunctionUtils.getReturnTypeInferenceForArray(),
List.of(field),
argList,
context.relBuilder);
case PERCENTILE_APPROX:
List<RexNode> newArgList = new ArrayList<>(argList);
newArgList.add(context.rexBuilder.makeFlag(field.getType().getSqlTypeName()));
return TransferUserDefinedAggFunction(
PercentileApproxFunction.class,
"percentile_approx",
ReturnTypes.ARG0_FORCE_NULLABLE,
List.of(field),
newArgList,
context.relBuilder);
default:
throw new UnsupportedOperationException(
"Unexpected aggregation: " + functionName.getName().getFunctionName());
}
return PPLFuncImpTable.INSTANCE.resolveAgg(functionName, distinct, field, argList, context);
}

/** Get all uniq input references from a RexNode. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static RelBuilder.AggCall TransferUserDefinedAggFunction(
return relBuilder.aggregateCall(sqlUDAF, addArgList);
}

static SqlReturnTypeInference getReturnTypeInferenceForArray() {
public static SqlReturnTypeInference getReturnTypeInferenceForArray() {
return opBinding -> {
RelDataTypeFactory typeFactory = opBinding.getTypeFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public enum BuiltinFunctionName {
TAKE(FunctionName.of("take")),
// t-digest percentile which is used in OpenSearch core by default.
PERCENTILE_APPROX(FunctionName.of("percentile_approx")),
DISTINCT_COUNT_APPROX(FunctionName.of("distinct_count_approx")),
// Not always an aggregation query
NESTED(FunctionName.of("nested")),

Expand Down Expand Up @@ -317,6 +318,7 @@ public enum BuiltinFunctionName {
.put("take", BuiltinFunctionName.TAKE)
.put("percentile", BuiltinFunctionName.PERCENTILE_APPROX)
.put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX)
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.build();

private static final Map<String, BuiltinFunctionName> WINDOW_FUNC_MAPPING =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@

package org.opensearch.sql.expression.function;

import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.*;
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_POP_NULLABLE;
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.TYPE_FACTORY;
import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.getLegacyTypeName;
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.TransferUserDefinedAggFunction;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.*;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.math.BigDecimal;
import java.util.ArrayList;
Expand All @@ -24,11 +28,23 @@
import org.apache.calcite.sql.fun.SqlLibraryOperators;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.fun.SqlTrimFunction.Flag;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction;
import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction;
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
import org.opensearch.sql.executor.QueryType;

public class PPLFuncImpTable {
/** A lambda function interface which could apply parameters to get AggCall. */
@FunctionalInterface
public interface AggHandler {
RelBuilder.AggCall apply(
boolean distinct, RexNode field, List<RexNode> argList, CalcitePlanContext context);
}

public interface FunctionImp {
RelDataType ANY_TYPE = TYPE_FACTORY.createSqlType(SqlTypeName.ANY);
Expand Down Expand Up @@ -88,7 +104,9 @@ default List<RelDataType> getParams() {
static {
final Builder builder = new Builder();
builder.populate();
INSTANCE = new PPLFuncImpTable(builder);
final AggBuilder aggBuilder = new AggBuilder();
aggBuilder.populate();
INSTANCE = new PPLFuncImpTable(builder, aggBuilder);
}

/**
Expand All @@ -107,12 +125,32 @@ default List<RelDataType> getParams() {
private final Map<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>>
externalFunctionRegistry;

private PPLFuncImpTable(Builder builder) {
/**
* The registry for built-in agg functions. Agg Functions defined by the PPL specification, whose
* implementations are independent of any specific data storage, should be registered here
* internally.
*/
private final ImmutableMap<BuiltinFunctionName, AggHandler> aggFunctionRegistry;

/**
* The external agg function registry. Agg Functions whose implementations depend on a specific
* data engine should be registered here. This reduces coupling between the core module and
* particular storage backends.
*/
private final Map<BuiltinFunctionName, AggHandler> aggExternalFunctionRegistry;

private PPLFuncImpTable(Builder builder, AggBuilder aggBuilder) {
final ImmutableMap.Builder<BuiltinFunctionName, List<Pair<CalciteFuncSignature, FunctionImp>>>
mapBuilder = ImmutableMap.builder();
builder.map.forEach((k, v) -> mapBuilder.put(k, List.copyOf(v)));
this.functionRegistry = ImmutableMap.copyOf(mapBuilder.build());
this.externalFunctionRegistry = new HashMap<>();

final ImmutableMap.Builder<BuiltinFunctionName, AggHandler> aggMapBuilder =
ImmutableMap.builder();
aggBuilder.map.forEach(aggMapBuilder::put);
this.aggFunctionRegistry = ImmutableMap.copyOf(aggMapBuilder.build());
this.aggExternalFunctionRegistry = new HashMap<>();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both externalFunctionRegistry and aggExternalFunctionRegistry should change to CurrentHashMap or add synchronized keyword in registerExternalFunction and registerExternalAggFunction. Because the PPLFuncImpTable instance is a static member. No problem for now since we only register external functions when initialize OpenSearchExecutionEngine. But it's a public API, we'd better protect it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already make two map to ConCurrentHashMap and slightly change the code of registerExternalFunction. The put in registerExternalAggFunction is already safe. Please check it.

}

/**
Expand All @@ -132,6 +170,33 @@ public void registerExternalFunction(BuiltinFunctionName functionName, FunctionI
}
}

/**
* Register a function implementation from external services dynamically.
*
* @param functionName the name of the function, has to be defined in BuiltinFunctionName
* @param functionImp the implementation of the agg function
*/
public void registerExternalAggFunction(
BuiltinFunctionName functionName, AggHandler functionImp) {
aggExternalFunctionRegistry.put(functionName, functionImp);
}

public RelBuilder.AggCall resolveAgg(
BuiltinFunctionName functionName,
boolean distinct,
RexNode field,
List<RexNode> argList,
CalcitePlanContext context) {
AggHandler handler = aggExternalFunctionRegistry.get(functionName);
if (handler == null) {
handler = aggFunctionRegistry.get(functionName);
}
if (handler == null) {
throw new IllegalStateException(String.format("Cannot resolve function: %s", functionName));
}
return handler.apply(distinct, field, argList, context);
}

public RexNode resolve(final RexBuilder builder, final String functionName, RexNode... args) {
Optional<BuiltinFunctionName> funcNameOpt = BuiltinFunctionName.of(functionName);
if (funcNameOpt.isEmpty()) {
Expand Down Expand Up @@ -464,4 +529,70 @@ public List<RelDataType> getParams() {
return List.of(boolType, boolType);
}
}

private static class AggBuilder {
private final Map<BuiltinFunctionName, AggHandler> map = new HashMap<>();

void register(BuiltinFunctionName functionName, AggHandler aggHandler) {
map.put(functionName, aggHandler);
}

void populate() {
register(MAX, (distinct, field, argList, ctx) -> ctx.relBuilder.max(field));
register(MIN, (distinct, field, argList, ctx) -> ctx.relBuilder.min(field));

register(AVG, (distinct, field, argList, ctx) -> ctx.relBuilder.avg(distinct, null, field));

register(
COUNT,
(distinct, field, argList, ctx) ->
ctx.relBuilder.count(
distinct, null, field == null ? ImmutableList.of() : ImmutableList.of(field)));
register(SUM, (distinct, field, argList, ctx) -> ctx.relBuilder.sum(distinct, null, field));

register(
VARSAMP,
(distinct, field, argList, ctx) ->
ctx.relBuilder.aggregateCall(VAR_SAMP_NULLABLE, field));

register(
VARPOP,
(distinct, field, argList, ctx) -> ctx.relBuilder.aggregateCall(VAR_POP_NULLABLE, field));

register(
STDDEV_SAMP,
(distinct, field, argList, ctx) ->
ctx.relBuilder.aggregateCall(STDDEV_SAMP_NULLABLE, field));

register(
STDDEV_POP,
(distinct, field, argList, ctx) ->
ctx.relBuilder.aggregateCall(STDDEV_POP_NULLABLE, field));

register(
TAKE,
(distinct, field, argList, ctx) ->
TransferUserDefinedAggFunction(
TakeAggFunction.class,
"TAKE",
UserDefinedFunctionUtils.getReturnTypeInferenceForArray(),
List.of(field),
argList,
ctx.relBuilder));

register(
PERCENTILE_APPROX,
(distinct, field, argList, ctx) -> {
List<RexNode> newArgList = new ArrayList<>(argList);
newArgList.add(ctx.rexBuilder.makeFlag(field.getType().getSqlTypeName()));
return TransferUserDefinedAggFunction(
PercentileApproxFunction.class,
"percentile_approx",
ReturnTypes.ARG0_FORCE_NULLABLE,
List.of(field),
newArgList,
ctx.relBuilder);
});
}
}
}
20 changes: 20 additions & 0 deletions docs/user/ppl/cmd/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,26 @@ Example::
| 2.8613807855648994 |
+--------------------+

DISTINCT_COUNT_APPROX
----------

Description
>>>>>>>>>>>
Comment thread
LantaoJin marked this conversation as resolved.

Version: 3.1.0

Usage: DISTINCT_COUNT_APPROX(expr). Return the approximate distinct count value of the expr, using the hyperloglog++ algorithm.

Example::

PPL> source=accounts | stats distinct_count_approx(gender);
fetched rows / total rows = 1/1
+-------------------------------+
| distinct_count_approx(gender) |
|-------------------------------|
| 2 |
+-------------------------------+

TAKE
----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Arrays;
import java.util.List;
import org.json.JSONObject;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;

Expand Down Expand Up @@ -515,21 +514,35 @@ public void testCountDistinct() {
}

@Test
public void testCountDistinctWithAlias() {
public void testCountDistinctApprox() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats distinct_count(state) as dc by gender", TEST_INDEX_BANK));
verifySchema(actual, schema("gender", "string"), schema("dc", "long"));
"source=%s | stats distinct_count_approx(state) by gender", TEST_INDEX_BANK));
verifySchema(
actual, schema("gender", "string"), schema("distinct_count_approx(state)", "long"));
verifyDataRows(actual, rows(3, "F"), rows(4, "M"));
}

@Ignore("https://github.com/opensearch-project/sql/issues/3353")
public void testApproxCountDistinct() {
@Test
public void testCountDistinctApproxWithAlias() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats distinct_count_approx(state) by gender", TEST_INDEX_BANK));
"source=%s | stats distinct_count_approx(state) as dca by gender",
TEST_INDEX_BANK));
verifySchema(actual, schema("gender", "string"), schema("dca", "long"));
verifyDataRows(actual, rows(3, "F"), rows(4, "M"));
}

@Test
public void testCountDistinctWithAlias() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats distinct_count(state) as dc by gender", TEST_INDEX_BANK));
verifySchema(actual, schema("gender", "string"), schema("dc", "long"));
verifyDataRows(actual, rows(3, "F"), rows(4, "M"));
}

@Test
Expand Down
Loading
Loading