Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
api 'org.apache.calcite:calcite-linq4j:1.38.0'
api project(':common')
implementation "com.github.seancfoley:ipaddress:5.4.2"
implementation "com.google.zetasketch:zetasketch:0.1.0"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the HLL library Spark or OpenSearch using? Any other open source project use this library

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the OpenSearch doc https://docs.opensearch.org/docs/latest/aggregations/metric/cardinality/#controlling-precision, the cardinality (count distinct) aggregation uses the HyperLogLog++ algorithm already. So as long as the count distinct is pushed down to DSL, it always is approximate. Please check which HLL library OpenSearch Core is leveraging. we'd better not to introduce a new dependency if core has already had. @xinyual

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I already change the code to use Opensearch core's implementation. Please check it.


annotationProcessor('org.immutables:value:2.8.8')
compileOnly('org.immutables:value-annotations:2.8.8')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.udaf;

import com.google.zetasketch.HyperLogLogPlusPlus;
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;

/** The function use HyperLogLogPlusPlus to count distinct count approximate value */
public class DistinctCountApproxAggFunction
implements UserDefinedAggFunction<DistinctCountApproxAggFunction.HLLAccumulator> {

@Override
public HLLAccumulator init() {
return new HLLAccumulator();
}

@Override
public Object result(HLLAccumulator accumulator) {
return accumulator.value();
}

@Override
public HLLAccumulator add(HLLAccumulator acc, Object... values) {
for (Object value : values) {
if (value != null) {
acc.add(value.toString());
}
}
return acc;
}

public static class HLLAccumulator implements UserDefinedAggFunction.Accumulator {
private final HyperLogLogPlusPlus<String> hll;

public HLLAccumulator() {
this.hll = new HyperLogLogPlusPlus.Builder().buildForStrings();
}

public void add(String value) {
hll.add(value);
}

@Override
public Object value(Object... args) {
return hll.result();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.sql.ast.expression.WindowBound;
import org.opensearch.sql.ast.expression.WindowFrame;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.calcite.udf.udaf.DistinctCountApproxAggFunction;
import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction;
import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
Expand Down Expand Up @@ -238,6 +239,14 @@ static RelBuilder.AggCall makeAggCall(
// case PERCENTILE_APPROX:
// return
// context.relBuilder.aggregateCall(SqlStdOperatorTable.PERCENTILE_CONT, field);
case DISTINCT_COUNT_APPROX:
return TransferUserDefinedAggFunction(
DistinctCountApproxAggFunction.class,
"APPROX_DISTINCT_COUNT",
ReturnTypes.BIGINT_FORCE_NULLABLE,
List.of(field),
argList,
context.relBuilder);
case TAKE:
return TransferUserDefinedAggFunction(
TakeAggFunction.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ public enum BuiltinFunctionName {
TAKE(FunctionName.of("take")),
// t-digest percentile which is used in OpenSearch core by default.
PERCENTILE_APPROX(FunctionName.of("percentile_approx")),
DISTINCT_COUNT_APPROX(FunctionName.of("distinct_count_approx")),
// Not always an aggregation query
NESTED(FunctionName.of("nested")),

Expand Down Expand Up @@ -310,6 +311,7 @@ public enum BuiltinFunctionName {
.put("take", BuiltinFunctionName.TAKE)
.put("percentile", BuiltinFunctionName.PERCENTILE_APPROX)
.put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX)
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.build();

private static final Map<String, BuiltinFunctionName> WINDOW_FUNC_MAPPING =
Expand Down
18 changes: 18 additions & 0 deletions docs/user/ppl/cmd/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,24 @@ Example::
| 2.8613807855648994 |
+--------------------+

DISTINCT_COUNT_APPROX
----------

Description
>>>>>>>>>>>
Comment thread
LantaoJin marked this conversation as resolved.

Usage: DISTINCT_COUNT_APPROX(expr). Return the approximate distinct count value of the expr, using the hyperloglog++ algorithm.

Example::

PPL> source=accounts | stats distinct_count_approx(gender);
fetched rows / total rows = 1/1
+-------------------------------+
| distinct_count_approx(gender) |
|-------------------------------|
| 2 |
+-------------------------------+

TAKE
----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Arrays;
import java.util.List;
import org.json.JSONObject;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;

Expand Down Expand Up @@ -515,21 +514,35 @@ public void testCountDistinct() {
}

@Test
public void testCountDistinctWithAlias() {
public void testCountDistinctApprox() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats distinct_count(state) as dc by gender", TEST_INDEX_BANK));
verifySchema(actual, schema("gender", "string"), schema("dc", "long"));
"source=%s | stats distinct_count_approx(state) by gender", TEST_INDEX_BANK));
verifySchema(
actual, schema("gender", "string"), schema("distinct_count_approx(state)", "long"));
verifyDataRows(actual, rows(3, "F"), rows(4, "M"));
}

@Ignore("https://github.com/opensearch-project/sql/issues/3353")
public void testApproxCountDistinct() {
@Test
public void testCountDistinctApproxWithAlias() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats distinct_count_approx(state) by gender", TEST_INDEX_BANK));
"source=%s | stats distinct_count_approx(state) as dca by gender",
TEST_INDEX_BANK));
verifySchema(actual, schema("gender", "string"), schema("dca", "long"));
verifyDataRows(actual, rows(3, "F"), rows(4, "M"));
}

@Test
public void testCountDistinctWithAlias() {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats distinct_count(state) as dc by gender", TEST_INDEX_BANK));
verifySchema(actual, schema("gender", "string"), schema("dc", "long"));
verifyDataRows(actual, rows(3, "F"), rows(4, "M"));
}

@Test
Expand Down
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ BIT_XOR_OP: '^';
AVG: 'AVG';
COUNT: 'COUNT';
DISTINCT_COUNT: 'DISTINCT_COUNT';
DISTINCT_COUNT_APPROX: 'DISTINCT_COUNT_APPROX';
ESTDC: 'ESTDC';
ESTDC_ERROR: 'ESTDC_ERROR';
MAX: 'MAX';
Expand Down
3 changes: 2 additions & 1 deletion ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ statsAggTerm
statsFunction
: statsFunctionName LT_PRTHS valueExpression RT_PRTHS # statsFunctionCall
| COUNT LT_PRTHS RT_PRTHS # countAllFunctionCall
| (DISTINCT_COUNT | DC) LT_PRTHS valueExpression RT_PRTHS # distinctCountFunctionCall
| (DISTINCT_COUNT | DC | DISTINCT_COUNT_APPROX) LT_PRTHS valueExpression RT_PRTHS # distinctCountFunctionCall
| takeAggFunction # takeAggFunctionCall
| percentileApproxFunction # percentileApproxFunctionCall
;
Expand Down Expand Up @@ -1096,6 +1096,7 @@ keywordsCanBeId
| statsFunctionName
| windowFunctionName
| DISTINCT_COUNT
| DISTINCT_COUNT_APPROX
| ESTDC
| ESTDC_ERROR
| MEAN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ public UnresolvedExpression visitCountAllFunctionCall(CountAllFunctionCallContex

@Override
public UnresolvedExpression visitDistinctCountFunctionCall(DistinctCountFunctionCallContext ctx) {
return new AggregateFunction("count", visit(ctx.valueExpression()), true);
String funcName = ctx.DISTINCT_COUNT_APPROX() != null ? "distinct_count_approx" : "count";
return new AggregateFunction(funcName, visit(ctx.valueExpression()), true);
}

@Override
Expand Down
Loading