Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public enum Key {
PATTERN_MAX_SAMPLE_COUNT("plugins.ppl.pattern.max.sample.count"),
PATTERN_BUFFER_LIMIT("plugins.ppl.pattern.buffer.limit"),
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
PPL_VALUES_MAX_LIMIT("plugins.ppl.values.max.limit"),
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),

/** Enable Calcite as execution engine */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
* <li>Order of values in the result is non-deterministic
* </ul>
*
* <p>Note: Similar to the TAKE function, LIST does not guarantee any specific order of values in
* the result array. The order may vary between executions and depends on the underlying query
* execution plan and optimizations.
* <p>LIST does not guarantee any specific order of values in the result array. The order may vary
* between executions and depends on the underlying query execution plan and optimizations.
*/
public class ListAggFunction implements UserDefinedAggFunction<ListAggFunction.ListAccumulator> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.udaf;

import org.opensearch.sql.common.setting.Settings;

/**
* Holder class to provide static access to Settings for UDAF functions. Since Calcite UDAF
* functions are instantiated via reflection with default constructor, we need this static holder to
* access settings.
*/
public class SettingsHolder {
private static volatile Settings settings;

/** Private constructor to prevent instantiation */
private SettingsHolder() {}

/**
* Set the settings instance. This should be called during plugin initialization.
*
* @param s Settings instance
*/
public static void setSettings(Settings s) {
settings = s;
}

/**
* Get the settings instance.
*
* @return Settings instance or null if not initialized
*/
public static Settings getSettings() {
return settings;
}

/**
* Get the maximum limit for VALUES aggregate function.
*
* @return Maximum limit (0 means unlimited)
*/
public static int getValuesMaxLimit() {
if (settings != null) {
Integer limit = settings.getSettingValue(Settings.Key.PPL_VALUES_MAX_LIMIT);
return limit != null ? limit : 0;
}
return 0; // Default when settings not available
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.udf.udaf;

import java.util.ArrayList;
import java.util.Set;
import java.util.TreeSet;
import org.opensearch.sql.calcite.udf.UserDefinedAggFunction;

/**
* VALUES aggregate function implementation. Returns distinct values from a field in lexicographical
* order as a multivalue field.
*
* <p>Behavior:
*
* <ul>
* <li>Returns unique values only (no duplicates)
* <li>Values are sorted in lexicographical order
* <li>Processes field values as strings (casts all inputs to strings)
* <li>Configurable limit via plugins.ppl.values.max.limit setting (0 = unlimited)
* <li>Supports only scalar data types (rejects STRUCT/ARRAY types)
* <li>Implementation uses TreeSet for automatic sorting and deduplication
* </ul>
*/
public class ValuesAggFunction
implements UserDefinedAggFunction<ValuesAggFunction.ValuesAccumulator> {

@Override
public ValuesAccumulator init() {
return new ValuesAccumulator();
}

@Override
public Object result(ValuesAccumulator accumulator) {
return accumulator.value();
}

@Override
public ValuesAccumulator add(ValuesAccumulator acc, Object... values) {
// Handle case where no values are passed
if (values == null || values.length == 0) {
return acc;
}

Object value = values[0];

// Filter out null values and check limit
int limit = getMaxValuesLimit();
if (value != null && (limit == 0 || acc.size() < limit)) {
// Convert value to string, handling all types safely
String stringValue = convertToString(value);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is null check on line 52, but convertToString(value); can also return null, which one is correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified this to String stringValue = String.valueOf(value);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String.valueOf(value); will return "null", is it expected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@penghuo For cases where value ==> "null" (string value), we'll include "null" in output. I feel this should be fine. As these are not actual null values. These are null strings ingested by the users.

For cases where value ==> null is handled in line 51.

acc.add(stringValue, limit);
}

return acc;
}

/** Converts any value to its string representation. */
private String convertToString(Object value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Values (array/object/... ) can not been translate to string should be ignored, instead of throw exception

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be automatically ignored in the function typechecker

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Array fields and MV fields I see the issue with both take and values where both return runtime exceptions.

### mv
POST http://localhost:9200/_plugins/_ppl/
Content-Type: application/json

{
    "query": "source=sample-logs | stats values(calc)"
}

###
POST http://localhost:9200/_plugins/_ppl/
Content-Type: application/json

{
    "query": "source=sample-logs | stats take(calc)"
}

Output:

HTTP/1.1 500 Internal Server Error
X-OpenSearch-Version: OpenSearch/3.2.0-SNAPSHOT (opensearch)
content-type: text/plain; charset=UTF-8
content-encoding: gzip
content-length: 248

{
  "error": {
    "reason": "There was internal problem at backend",
    "details": "java.sql.SQLException: exception while executing query: class java.util.ArrayList cannot be cast to class java.lang.Long (java.util.ArrayList and java.lang.Long are in module java.base of loader 'bootstrap')",
    "type": "RuntimeException"
  },
  "status": 500
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks
Track with scheamless issue #3995

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (value == null) {
return null;
}
return String.valueOf(value);
}

public static class ValuesAccumulator implements Accumulator {
private final Set<String> values;

public ValuesAccumulator() {
this.values = new TreeSet<>(); // TreeSet maintains sorted order and uniqueness
}

@Override
public Object value(Object... argList) {
return new ArrayList<>(values); // Return List<String> to match expected type
}

public void add(String value, int limit) {
if (limit == 0 || values.size() < limit) {
values.add(value);
}
}

public int size() {
return values.size();
}
}

/**
* Get the maximum limit for values from settings.
*
* @return Maximum limit (0 means unlimited)
*/
private int getMaxValuesLimit() {
return SettingsHolder.getValuesMaxLimit();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid fetch from setting, follow TakeAggFunction.class impl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated with AST implementation: cb5dcdf

}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public enum BuiltinFunctionName {

// Multivalue aggregation function
LIST(FunctionName.of("list")),
VALUES(FunctionName.of("values")),
// Not always an aggregation query
NESTED(FunctionName.of("nested")),
// Document order aggregation functions
Expand Down Expand Up @@ -364,6 +365,7 @@ public enum BuiltinFunctionName {
.put("latest", BuiltinFunctionName.LATEST)
.put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("list", BuiltinFunctionName.LIST)
.put("values", BuiltinFunctionName.VALUES)
.put("pattern", BuiltinFunctionName.INTERNAL_PATTERN)
.put("first", BuiltinFunctionName.FIRST)
.put("last", BuiltinFunctionName.LAST)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.sql.calcite.udf.udaf.NullableSqlAvgAggFunction;
import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction;
import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction;
import org.opensearch.sql.calcite.udf.udaf.ValuesAggFunction;
import org.opensearch.sql.calcite.utils.PPLOperandTypes;
import org.opensearch.sql.calcite.utils.PPLReturnTypes;
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
Expand Down Expand Up @@ -450,6 +451,12 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
public static final SqlAggFunction LIST =
createUserDefinedAggFunction(
ListAggFunction.class, "LIST", PPLReturnTypes.STRING_ARRAY, PPLOperandTypes.ANY_SCALAR);
public static final SqlAggFunction VALUES =
createUserDefinedAggFunction(
ValuesAggFunction.class,
"VALUES",
PPLReturnTypes.STRING_ARRAY,
PPLOperandTypes.ANY_SCALAR);

public static final SqlOperator ENHANCED_COALESCE =
new EnhancedCoalesceFunction().toUDF("COALESCE");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.UTC_DATE;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.UTC_TIME;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.UTC_TIMESTAMP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.VALUES;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.VARPOP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.VARSAMP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.WEEK;
Expand Down Expand Up @@ -1120,6 +1121,7 @@ void populate() {
registerOperator(TAKE, PPLBuiltinOperators.TAKE);
registerOperator(INTERNAL_PATTERN, PPLBuiltinOperators.INTERNAL_PATTERN);
registerOperator(LIST, PPLBuiltinOperators.LIST);
registerOperator(VALUES, PPLBuiltinOperators.VALUES);

register(
AVG,
Expand Down
82 changes: 82 additions & 0 deletions docs/user/ppl/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,85 @@ PPL query::
}
}
}

plugins.ppl.values.max.limit
============================

Description
-----------

This setting controls the maximum number of unique values that the ``VALUES`` aggregation function can return. When set to 0 (the default), there is no limit on the number of unique values returned. When set to a positive integer, the function will return at most that many unique values.

1. The default value is 0 (unlimited).
2. This setting is node scope.
3. This setting can be updated dynamically.

The ``VALUES`` function collects all unique values from a field and returns them in lexicographical order. This setting helps manage memory usage by limiting the number of values collected.

Example 1
---------

Set the limit to 1000 unique values:

PPL query::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient" : {"plugins.ppl.values.max.limit" : "1000"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"ppl": {
"values": {
"max": {
"limit": "1000"
}
}
}
}
}
}

Example 2
---------

Reset to default (unlimited) by setting to null:

PPL query::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient" : {"plugins.ppl.values.max.limit" : null}}'
{
"acknowledged": true,
"persistent": {},
"transient": {}
}

Example 3
---------

Set to 0 explicitly for unlimited values:

PPL query::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X PUT localhost:9200/_plugins/_query/settings \
... -d '{"transient" : {"plugins.ppl.values.max.limit" : "0"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"ppl": {
"values": {
"max": {
"limit": "0"
}
}
}
}
}
}
66 changes: 66 additions & 0 deletions docs/user/ppl/cmd/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ The following table dataSources the aggregation functions and also indicates how
+----------+-------------+-------------+
| LIST | Ignore | Ignore |
+----------+-------------+-------------+
| VALUES | Ignore | Ignore |
+----------+-------------+-------------+


Syntax
Expand Down Expand Up @@ -577,6 +579,56 @@ Example with result field rename::
| ["Amber","Hattie","Nanette","Dale"] |
+-------------------------------------+

VALUES
------

Description
>>>>>>>>>>>

Version: 3.3.0 (Calcite engine only)

Usage: VALUES(expr). Collects all unique values from the specified expression into a sorted array. Values are converted to strings, nulls are filtered, and duplicates are removed.

The maximum number of unique values returned is controlled by the ``plugins.ppl.values.max.limit`` setting:
* Default value is 0, which means unlimited values are returned
* Can be configured to any positive integer to limit the number of unique values
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated here: 50a4bf8

* See the `PPL Settings <../admin/settings.rst#plugins-ppl-values-max-limit>`_ documentation for more details

* expr: The field expression to collect unique values from.
* This aggregation function doesn't support Array, Struct, Object field types.
* Returns distinct values only (no duplicates)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate with line 590.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, removed it here 50a4bf8

* Values are sorted in lexicographical order

Example with string fields::

PPL> source=accounts | stats values(firstname);
fetched rows / total rows = 1/1
+-------------------------------------+
| values(firstname) |
|-------------------------------------|
| ["Amber","Dale","Hattie","Nanette"] |
+-------------------------------------+

Example with numeric fields (sorted as strings)::

PPL> source=accounts | stats values(age);
fetched rows / total rows = 1/1
+---------------------------+
| values(age) |
|---------------------------|
| ["28","32","33","36","39"] |
+---------------------------+

Example with result field rename::

PPL> source=accounts | stats values(firstname) as unique_names;
fetched rows / total rows = 1/1
+-------------------------------------+
| unique_names |
|-------------------------------------|
| ["Amber","Dale","Hattie","Nanette"] |
+-------------------------------------+

Example 1: Calculate the count of events
========================================

Expand Down Expand Up @@ -833,3 +885,17 @@ PPL query::
| 1 | [email protected] |
+-----+-----------------------+

Example 16: Collect unique values in a field using VALUES
==========================================================

The example shows how to collect all unique firstname values, sorted lexicographically with duplicates removed.

PPL query::

PPL> source=accounts | stats values(firstname);
fetched rows / total rows = 1/1
+-------------------------------------+
| values(firstname) |
|-------------------------------------|
| ["Amber","Dale","Hattie","Nanette"] |
+-------------------------------------+
Loading
Loading