Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.udaf;

import java.util.List;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.calcite.CalcitePlanContext;

@FunctionalInterface
public interface AggHandler {
RelBuilder.AggCall apply(
boolean distinct, RexNode field, List<RexNode> argList, CalcitePlanContext context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.udaf;

import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.*;
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.TransferUserDefinedAggFunction;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.*;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.type.ReturnTypes;
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
import org.opensearch.sql.expression.function.BuiltinFunctionName;

public class AggTransferFunctionMap {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a java doc here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the doc inside PPLFuncImpTable

public static Map<BuiltinFunctionName, AggHandler> AGG_FUNCTION_MAP;

static {
AGG_FUNCTION_MAP = new HashMap<>();
AGG_FUNCTION_MAP.put(MAX, (distinct, field, argList, ctx) -> ctx.relBuilder.max(field));

AGG_FUNCTION_MAP.put(MIN, (distinct, field, argList, ctx) -> ctx.relBuilder.min(field));

AGG_FUNCTION_MAP.put(
AVG, (distinct, field, argList, ctx) -> ctx.relBuilder.avg(distinct, null, field));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using map builder

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


AGG_FUNCTION_MAP.put(
COUNT,
(distinct, field, argList, ctx) ->
ctx.relBuilder.count(
distinct, null, field == null ? ImmutableList.of() : ImmutableList.of(field)));

AGG_FUNCTION_MAP.put(
SUM, (distinct, field, argList, ctx) -> ctx.relBuilder.sum(distinct, null, field));

AGG_FUNCTION_MAP.put(
VARSAMP,
(distinct, field, argList, ctx) -> ctx.relBuilder.aggregateCall(VAR_SAMP_NULLABLE, field));

AGG_FUNCTION_MAP.put(
VARPOP,
(distinct, field, argList, ctx) -> ctx.relBuilder.aggregateCall(VAR_POP_NULLABLE, field));

AGG_FUNCTION_MAP.put(
STDDEV_SAMP,
(distinct, field, argList, ctx) ->
ctx.relBuilder.aggregateCall(STDDEV_SAMP_NULLABLE, field));

AGG_FUNCTION_MAP.put(
STDDEV_POP,
(distinct, field, argList, ctx) ->
ctx.relBuilder.aggregateCall(STDDEV_POP_NULLABLE, field));

AGG_FUNCTION_MAP.put(
TAKE,
(distinct, field, argList, ctx) ->
TransferUserDefinedAggFunction(
TakeAggFunction.class,
"TAKE",
UserDefinedFunctionUtils.getReturnTypeInferenceForArray(),
List.of(field),
argList,
ctx.relBuilder));

AGG_FUNCTION_MAP.put(
PERCENTILE_APPROX,
(distinct, field, argList, ctx) -> {
List<RexNode> newArgList = new ArrayList<>(argList);
newArgList.add(ctx.rexBuilder.makeFlag(field.getType().getSqlTypeName()));
return TransferUserDefinedAggFunction(
PercentileApproxFunction.class,
"percentile_approx",
ReturnTypes.ARG0_FORCE_NULLABLE,
List.of(field),
newArgList,
ctx.relBuilder);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
import static org.apache.calcite.rex.RexWindowBounds.UNBOUNDED_PRECEDING;
import static org.apache.calcite.rex.RexWindowBounds.following;
import static org.apache.calcite.rex.RexWindowBounds.preceding;
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_POP_NULLABLE;
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.STDDEV_SAMP_NULLABLE;
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.VAR_POP_NULLABLE;
import static org.opensearch.sql.calcite.utils.CalciteToolsHelper.VAR_SAMP_NULLABLE;
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.TransferUserDefinedAggFunction;
import static org.opensearch.sql.calcite.udf.udaf.AggTransferFunctionMap.AGG_FUNCTION_MAP;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
Expand All @@ -25,16 +21,14 @@
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.rex.RexWindowBound;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.RelBuilder;
import org.opensearch.sql.ast.expression.IntervalUnit;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.ast.expression.WindowBound;
import org.opensearch.sql.ast.expression.WindowFrame;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction;
import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction;
import org.opensearch.sql.calcite.udf.udaf.AggHandler;
import org.opensearch.sql.expression.function.BuiltinFunctionName;

public interface PlanUtils {
Expand Down Expand Up @@ -226,56 +220,13 @@ static RelBuilder.AggCall makeAggCall(
boolean distinct,
RexNode field,
List<RexNode> argList) {
switch (functionName) {
case MAX:
return context.relBuilder.max(field);
case MIN:
return context.relBuilder.min(field);
case AVG:
return context.relBuilder.avg(distinct, null, field);
case COUNT:
return context.relBuilder.count(
distinct, null, field == null ? ImmutableList.of() : ImmutableList.of(field));
case SUM:
return context.relBuilder.sum(distinct, null, field);
// case MEAN:
// throw new UnsupportedOperationException("MEAN is not supported in PPL");
// case STDDEV:
// return context.relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV,
// field);
case VARSAMP:
return context.relBuilder.aggregateCall(VAR_SAMP_NULLABLE, field);
case VARPOP:
return context.relBuilder.aggregateCall(VAR_POP_NULLABLE, field);
case STDDEV_POP:
return context.relBuilder.aggregateCall(STDDEV_POP_NULLABLE, field);
case STDDEV_SAMP:
return context.relBuilder.aggregateCall(STDDEV_SAMP_NULLABLE, field);
// case PERCENTILE_APPROX:
// return
// context.relBuilder.aggregateCall(SqlStdOperatorTable.PERCENTILE_CONT, field);
case TAKE:
return TransferUserDefinedAggFunction(
TakeAggFunction.class,
"TAKE",
UserDefinedFunctionUtils.getReturnTypeInferenceForArray(),
List.of(field),
argList,
context.relBuilder);
case PERCENTILE_APPROX:
List<RexNode> newArgList = new ArrayList<>(argList);
newArgList.add(context.rexBuilder.makeFlag(field.getType().getSqlTypeName()));
return TransferUserDefinedAggFunction(
PercentileApproxFunction.class,
"percentile_approx",
ReturnTypes.ARG0_FORCE_NULLABLE,
List.of(field),
newArgList,
context.relBuilder);
default:
throw new UnsupportedOperationException(
"Unexpected aggregation: " + functionName.getName().getFunctionName());
AggHandler handler = AGG_FUNCTION_MAP.get(functionName);

if (handler == null) {
throw new UnsupportedOperationException("Unexpected aggregation: " + functionName);
}

return handler.apply(distinct, field, argList, context);
}

/** Get all uniq input references from a RexNode. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public static RelBuilder.AggCall TransferUserDefinedAggFunction(
return relBuilder.aggregateCall(sqlUDAF, addArgList);
}

static SqlReturnTypeInference getReturnTypeInferenceForArray() {
public static SqlReturnTypeInference getReturnTypeInferenceForArray() {
return opBinding -> {
RelDataTypeFactory typeFactory = opBinding.getTypeFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public enum BuiltinFunctionName {
TAKE(FunctionName.of("take")),
// t-digest percentile which is used in OpenSearch core by default.
PERCENTILE_APPROX(FunctionName.of("percentile_approx")),
DISTINCT_COUNT_APPROX(FunctionName.of("distinct_count_approx")),
// Not always an aggregation query
NESTED(FunctionName.of("nested")),

Expand Down Expand Up @@ -315,6 +316,7 @@ public enum BuiltinFunctionName {
.put("take", BuiltinFunctionName.TAKE)
.put("percentile", BuiltinFunctionName.PERCENTILE_APPROX)
.put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX)
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.build();

private static final Map<String, BuiltinFunctionName> WINDOW_FUNC_MAPPING =
Expand Down
20 changes: 20 additions & 0 deletions docs/user/ppl/cmd/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,26 @@ Example::
| 2.8613807855648994 |
+--------------------+

DISTINCT_COUNT_APPROX
----------

Description
>>>>>>>>>>>
Comment thread
LantaoJin marked this conversation as resolved.

Version: 3.1.0

Usage: DISTINCT_COUNT_APPROX(expr). Return the approximate distinct count value of the expr, using the hyperloglog++ algorithm.

Example::

PPL> source=accounts | stats distinct_count_approx(gender);
fetched rows / total rows = 1/1
+-------------------------------+
| distinct_count_approx(gender) |
|-------------------------------|
| 2 |
+-------------------------------+

TAKE
----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Arrays;
import java.util.List;
import org.json.JSONObject;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;

Expand Down Expand Up @@ -515,21 +514,35 @@ public void testCountDistinct() {
}

@Test
public void testCountDistinctWithAlias() {
public void testCountDistinctApprox() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats distinct_count(state) as dc by gender", TEST_INDEX_BANK));
verifySchema(actual, schema("gender", "string"), schema("dc", "long"));
"source=%s | stats distinct_count_approx(state) by gender", TEST_INDEX_BANK));
verifySchema(
actual, schema("gender", "string"), schema("distinct_count_approx(state)", "long"));
verifyDataRows(actual, rows(3, "F"), rows(4, "M"));
}

@Ignore("https://github.com/opensearch-project/sql/issues/3353")
public void testApproxCountDistinct() {
@Test
public void testCountDistinctApproxWithAlias() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats distinct_count_approx(state) by gender", TEST_INDEX_BANK));
"source=%s | stats distinct_count_approx(state) as dca by gender",
TEST_INDEX_BANK));
verifySchema(actual, schema("gender", "string"), schema("dca", "long"));
verifyDataRows(actual, rows(3, "F"), rows(4, "M"));
}

@Test
public void testCountDistinctWithAlias() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats distinct_count(state) as dc by gender", TEST_INDEX_BANK));
verifySchema(actual, schema("gender", "string"), schema("dc", "long"));
verifyDataRows(actual, rows(3, "F"), rows(4, "M"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.sql.opensearch.executor;

import static org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.convertRelDataTypeToExprType;
import static org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils.TransferUserDefinedAggFunction;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DISTINCT_COUNT_APPROX;

import java.security.AccessController;
import java.security.PrivilegedAction;
Expand All @@ -25,8 +27,11 @@
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.type.ReturnTypes;
import org.opensearch.sql.ast.statement.Explain.ExplainFormat;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.udf.udaf.AggTransferFunctionMap;
import org.opensearch.sql.calcite.udf.udaf.AggTransferFunctionMap.*;
import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprTupleValue;
Expand All @@ -41,6 +46,7 @@
import org.opensearch.sql.expression.function.PPLFuncImpTable;
import org.opensearch.sql.opensearch.client.OpenSearchClient;
import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector;
import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction;
import org.opensearch.sql.opensearch.functions.GeoIpFunction;
import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor;
import org.opensearch.sql.planner.physical.PhysicalPlan;
Expand Down Expand Up @@ -246,5 +252,16 @@ private void registerOpenSearchFunctions() {
(builder, args) ->
builder.makeCall(new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP"), args);
PPLFuncImpTable.INSTANCE.registerExternalFunction(BuiltinFunctionName.GEOIP, geoIpImpl);

AggTransferFunctionMap.AGG_FUNCTION_MAP.put(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think can move AGG_FUNCTION_MAP to internal of PPLFuncImpTable.
Than we could register agg by

PPLFuncImpTable.INSTANCE.registerExternalFunction(BuiltinFunctionName.DISTINCT_COUNT_APPROX, distinctCountImpl);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface is different. The function's return is a RexNode while aggregation return an Relbuilder.AggCall. But I will put all content inside the PPLFuncImpTable. The code architecture could be better.

DISTINCT_COUNT_APPROX,
(distinct, field, argList, ctx) ->
TransferUserDefinedAggFunction(
DistinctCountApproxAggFunction.class,
"APPROX_DISTINCT_COUNT",
ReturnTypes.BIGINT_FORCE_NULLABLE,
List.of(field),
argList,
ctx.relBuilder));
}
}
Loading
Loading