diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 069a9d18ee2..f2002e8f283 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -30,6 +30,7 @@ public enum Key { PATTERN_MAX_SAMPLE_COUNT("plugins.ppl.pattern.max.sample.count"), PATTERN_BUFFER_LIMIT("plugins.ppl.pattern.buffer.limit"), PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"), + PPL_VALUES_MAX_LIMIT("plugins.ppl.values.max.limit"), PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"), /** Enable Calcite as execution engine */ diff --git a/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/ListAggFunction.java b/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/ListAggFunction.java index 709df157e2d..30f968fead5 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/ListAggFunction.java +++ b/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/ListAggFunction.java @@ -21,9 +21,8 @@ *
  • Order of values in the result is non-deterministic * * - *

    Note: Similar to the TAKE function, LIST does not guarantee any specific order of values in - * the result array. The order may vary between executions and depends on the underlying query - * execution plan and optimizations. + *

    LIST does not guarantee any specific order of values in the result array. The order may vary + * between executions and depends on the underlying query execution plan and optimizations. */ public class ListAggFunction implements UserDefinedAggFunction { diff --git a/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/ValuesAggFunction.java b/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/ValuesAggFunction.java new file mode 100644 index 00000000000..b5d68fe7ce3 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/udf/udaf/ValuesAggFunction.java @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.udf.udaf; + +import java.util.ArrayList; +import java.util.Set; +import java.util.TreeSet; +import org.opensearch.sql.calcite.udf.UserDefinedAggFunction; + +/** + * VALUES aggregate function implementation. Returns distinct values from a field in lexicographical + * order as a multivalue field. + * + *

    Behavior: + * + *

    + */ +public class ValuesAggFunction + implements UserDefinedAggFunction { + + @Override + public ValuesAccumulator init() { + return new ValuesAccumulator(); + } + + @Override + public Object result(ValuesAccumulator accumulator) { + return accumulator.value(); + } + + @Override + public ValuesAccumulator add(ValuesAccumulator acc, Object... values) { + // Handle case where no values are passed + if (values == null || values.length == 0) { + return acc; + } + + Object value = values[0]; + + // Get limit from second argument (passed from AST) + int limit = 0; // Default to unlimited + if (values.length > 1 && values[1] != null) { + limit = (Integer) values[1]; + } + + // Filter out null values and check limit + if (value != null && (limit == 0 || acc.size() < limit)) { + // Convert value to string + String stringValue = String.valueOf(value); + acc.add(stringValue, limit); + } + + return acc; + } + + public static class ValuesAccumulator implements Accumulator { + private final Set values; + + public ValuesAccumulator() { + this.values = new TreeSet<>(); // TreeSet maintains sorted order and uniqueness + } + + @Override + public Object value(Object... argList) { + return new ArrayList<>(values); // Return List to match expected type + } + + public void add(String value, int limit) { + if (limit == 0 || values.size() < limit) { + values.add(value); + } + } + + public int size() { + return values.size(); + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PPLOperandTypes.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PPLOperandTypes.java index 0d343711b3f..deb29ca2f6f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PPLOperandTypes.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PPLOperandTypes.java @@ -22,6 +22,45 @@ public class PPLOperandTypes { // This class is not meant to be instantiated. private PPLOperandTypes() {} + /** List of all scalar type signatures (single parameter each) */ + private static final java.util.List> + SCALAR_TYPES = + java.util.List.of( + // Numeric types + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BYTE), + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.SHORT), + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.INTEGER), + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.LONG), + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.FLOAT), + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.DOUBLE), + // String type + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.STRING), + // Boolean type + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BOOLEAN), + // Temporal types + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.DATE), + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.TIME), + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP), + // Special scalar types + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.IP), + java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BINARY)); + + /** Helper method to create scalar types with optional integer parameter */ + private static java.util.List> + createScalarWithOptionalInteger() { + java.util.List> result = + new java.util.ArrayList<>(SCALAR_TYPES); + + // Add scalar + integer combinations + SCALAR_TYPES.forEach( + scalarType -> + result.add( + java.util.List.of( + scalarType.get(0), org.opensearch.sql.data.type.ExprCoreType.INTEGER))); + + return result; + } + public static final UDFOperandMetadata NONE = UDFOperandMetadata.wrap(OperandTypes.family()); public static final UDFOperandMetadata OPTIONAL_ANY = UDFOperandMetadata.wrap( @@ -200,25 +239,12 @@ private PPLOperandTypes() {} * booleans, datetime types, and special scalar types like IP and BINARY. Excludes complex types * like arrays, structs, and maps. */ - public static final UDFOperandMetadata ANY_SCALAR = - UDFOperandMetadata.wrapUDT( - java.util.List.of( - // Numeric types - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BYTE), - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.SHORT), - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.INTEGER), - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.LONG), - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.FLOAT), - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.DOUBLE), - // String type - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.STRING), - // Boolean type - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BOOLEAN), - // Temporal types - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.DATE), - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.TIME), - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP), - // Special scalar types - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.IP), - java.util.List.of(org.opensearch.sql.data.type.ExprCoreType.BINARY))); + public static final UDFOperandMetadata ANY_SCALAR = UDFOperandMetadata.wrapUDT(SCALAR_TYPES); + + /** + * Operand type checker that accepts any scalar type with an optional integer argument. This is + * used for aggregation functions that take a field and an optional limit/size parameter. + */ + public static final UDFOperandMetadata ANY_SCALAR_OPTIONAL_INTEGER = + UDFOperandMetadata.wrapUDT(createScalarWithOptionalInteger()); } diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index ed65a472ca3..7cfa5920603 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -207,6 +207,7 @@ public enum BuiltinFunctionName { // Multivalue aggregation function LIST(FunctionName.of("list")), + VALUES(FunctionName.of("values")), // Not always an aggregation query NESTED(FunctionName.of("nested")), // Document order aggregation functions @@ -364,6 +365,7 @@ public enum BuiltinFunctionName { .put("latest", BuiltinFunctionName.LATEST) .put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX) .put("list", BuiltinFunctionName.LIST) + .put("values", BuiltinFunctionName.VALUES) .put("pattern", BuiltinFunctionName.INTERNAL_PATTERN) .put("first", BuiltinFunctionName.FIRST) .put("last", BuiltinFunctionName.LAST) diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java index 055fedaec9d..e84db28655e 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java @@ -36,6 +36,7 @@ import org.opensearch.sql.calcite.udf.udaf.NullableSqlAvgAggFunction; import org.opensearch.sql.calcite.udf.udaf.PercentileApproxFunction; import org.opensearch.sql.calcite.udf.udaf.TakeAggFunction; +import org.opensearch.sql.calcite.udf.udaf.ValuesAggFunction; import org.opensearch.sql.calcite.utils.PPLOperandTypes; import org.opensearch.sql.calcite.utils.PPLReturnTypes; import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; @@ -450,6 +451,12 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable { public static final SqlAggFunction LIST = createUserDefinedAggFunction( ListAggFunction.class, "LIST", PPLReturnTypes.STRING_ARRAY, PPLOperandTypes.ANY_SCALAR); + public static final SqlAggFunction VALUES = + createUserDefinedAggFunction( + ValuesAggFunction.class, + "VALUES", + PPLReturnTypes.STRING_ARRAY, + PPLOperandTypes.ANY_SCALAR_OPTIONAL_INTEGER); public static final SqlOperator ENHANCED_COALESCE = new EnhancedCoalesceFunction().toUDF("COALESCE"); diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java index 831ed346a5a..8752a0ae822 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java @@ -217,6 +217,7 @@ import static org.opensearch.sql.expression.function.BuiltinFunctionName.UTC_DATE; import static org.opensearch.sql.expression.function.BuiltinFunctionName.UTC_TIME; import static org.opensearch.sql.expression.function.BuiltinFunctionName.UTC_TIMESTAMP; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.VALUES; import static org.opensearch.sql.expression.function.BuiltinFunctionName.VARPOP; import static org.opensearch.sql.expression.function.BuiltinFunctionName.VARSAMP; import static org.opensearch.sql.expression.function.BuiltinFunctionName.WEEK; @@ -1120,6 +1121,7 @@ void populate() { registerOperator(TAKE, PPLBuiltinOperators.TAKE); registerOperator(INTERNAL_PATTERN, PPLBuiltinOperators.INTERNAL_PATTERN); registerOperator(LIST, PPLBuiltinOperators.LIST); + registerOperator(VALUES, PPLBuiltinOperators.VALUES); register( AVG, diff --git a/docs/user/ppl/admin/settings.rst b/docs/user/ppl/admin/settings.rst index 61345f0fe45..389a5c24be8 100644 --- a/docs/user/ppl/admin/settings.rst +++ b/docs/user/ppl/admin/settings.rst @@ -226,3 +226,85 @@ PPL query:: } } } + +plugins.ppl.values.max.limit +============================ + +Description +----------- + +This setting controls the maximum number of unique values that the ``VALUES`` aggregation function can return. When set to 0 (the default), there is no limit on the number of unique values returned. When set to a positive integer, the function will return at most that many unique values. + +1. The default value is 0 (unlimited). +2. This setting is node scope. +3. This setting can be updated dynamically. + +The ``VALUES`` function collects all unique values from a field and returns them in lexicographical order. This setting helps manage memory usage by limiting the number of values collected. + +Example 1 +--------- + +Set the limit to 1000 unique values: + +PPL query:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"transient" : {"plugins.ppl.values.max.limit" : "1000"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "ppl": { + "values": { + "max": { + "limit": "1000" + } + } + } + } + } + } + +Example 2 +--------- + +Reset to default (unlimited) by setting to null: + +PPL query:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"transient" : {"plugins.ppl.values.max.limit" : null}}' + { + "acknowledged": true, + "persistent": {}, + "transient": {} + } + +Example 3 +--------- + +Set to 0 explicitly for unlimited values: + +PPL query:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"transient" : {"plugins.ppl.values.max.limit" : "0"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "ppl": { + "values": { + "max": { + "limit": "0" + } + } + } + } + } + } diff --git a/docs/user/ppl/cmd/stats.rst b/docs/user/ppl/cmd/stats.rst index 872e2787d09..20b4873d5e7 100644 --- a/docs/user/ppl/cmd/stats.rst +++ b/docs/user/ppl/cmd/stats.rst @@ -34,6 +34,8 @@ The following table dataSources the aggregation functions and also indicates how +----------+-------------+-------------+ | LIST | Ignore | Ignore | +----------+-------------+-------------+ +| VALUES | Ignore | Ignore | ++----------+-------------+-------------+ Syntax @@ -577,6 +579,52 @@ Example with result field rename:: | ["Amber","Hattie","Nanette","Dale"] | +-------------------------------------+ +VALUES +------ + +Description +>>>>>>>>>>> + +Version: 3.3.0 (Calcite engine only) + +Usage: VALUES(expr). Collects all unique values from the specified expression into a sorted array. Values are converted to strings, nulls are filtered, and duplicates are removed. + +The maximum number of unique values returned is controlled by the ``plugins.ppl.values.max.limit`` setting: + +* Default value is 0, which means unlimited values are returned +* Can be configured to any positive integer to limit the number of unique values +* See the `PPL Settings <../admin/settings.rst#plugins-ppl-values-max-limit>`_ documentation for more details + +Example with string fields:: + + PPL> source=accounts | stats values(firstname); + fetched rows / total rows = 1/1 + +-------------------------------------+ + | values(firstname) | + |-------------------------------------| + | ["Amber","Dale","Hattie","Nanette"] | + +-------------------------------------+ + +Example with numeric fields (sorted as strings):: + + PPL> source=accounts | stats values(age); + fetched rows / total rows = 1/1 + +---------------------------+ + | values(age) | + |---------------------------| + | ["28","32","33","36","39"] | + +---------------------------+ + +Example with result field rename:: + + PPL> source=accounts | stats values(firstname) as unique_names; + fetched rows / total rows = 1/1 + +-------------------------------------+ + | unique_names | + |-------------------------------------| + | ["Amber","Dale","Hattie","Nanette"] | + +-------------------------------------+ + Example 1: Calculate the count of events ======================================== @@ -833,3 +881,17 @@ PPL query:: | 1 | hattiebond@netagy.com | +-----+-----------------------+ +Example 16: Collect unique values in a field using VALUES +========================================================== + +The example shows how to collect all unique firstname values, sorted lexicographically with duplicates removed. + +PPL query:: + + PPL> source=accounts | stats values(firstname); + fetched rows / total rows = 1/1 + +-------------------------------------+ + | values(firstname) | + |-------------------------------------| + | ["Amber","Dale","Hattie","Nanette"] | + +-------------------------------------+ diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index c4f02502813..a2ada89a84a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -501,6 +501,15 @@ public void testListAggregationExplain() throws IOException { "source=opensearch-sql_test_index_account | stats list(age) as age_list")); } + @Test + public void testValuesAggregationExplain() throws IOException { + String expected = loadExpectedPlan("explain_values_aggregation.json"); + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + "source=opensearch-sql_test_index_account | stats values(age) as age_values")); + } + @Test public void testRegexExplain() throws IOException { String query = diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultiValueStatsIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultiValueStatsIT.java index e8bebaf291c..c374f8bbb29 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultiValueStatsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultiValueStatsIT.java @@ -5,6 +5,8 @@ package org.opensearch.sql.calcite.remote; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_CALCS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DATATYPE_NONNUMERIC; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DATATYPE_NUMERIC; @@ -15,6 +17,7 @@ import java.io.IOException; import java.util.List; +import org.json.JSONArray; import org.json.JSONObject; import org.junit.jupiter.api.Test; import org.opensearch.sql.ppl.PPLIntegTestCase; @@ -274,4 +277,204 @@ public void testListFunctionWithArithmeticExpression() throws IOException { verifySchema(response, schema("arithmetic_list", "array")); verifyDataRows(response, rows(List.of("9", "14", "3"))); } + + // ==================== VALUES Function Tests ==================== + + @Test + public void testValuesFunctionWithBoolean() throws IOException { + JSONObject response = + executeQuery( + String.format( + "source=%s | stats values(boolean_value) as bool_values", + TEST_INDEX_DATATYPE_NONNUMERIC)); + verifySchema(response, schema("bool_values", "array")); + // VALUES returns unique values sorted lexicographically + verifyDataRows(response, rows(List.of("true"))); + } + + @Test + public void testValuesFunctionWithInteger() throws IOException { + JSONObject response = + executeQuery( + String.format( + "source=%s | stats values(integer_number) as int_values", + TEST_INDEX_DATATYPE_NUMERIC)); + verifySchema(response, schema("int_values", "array")); + verifyDataRows(response, rows(List.of("2"))); + } + + @Test + public void testValuesFunctionWithString() throws IOException { + JSONObject response = + executeQuery( + String.format( + "source=%s | stats values(keyword_value) as keyword_values", + TEST_INDEX_DATATYPE_NONNUMERIC)); + verifySchema(response, schema("keyword_values", "array")); + verifyDataRows(response, rows(List.of("keyword"))); + } + + @Test + public void testValuesFunctionWithDuplicates() throws IOException { + // Test that VALUES deduplicates values + JSONObject response = + executeQuery( + String.format( + "source=%s | head 10 | stats values(bool0) as unique_bool_values", + TEST_INDEX_CALCS)); + verifySchema(response, schema("unique_bool_values", "array")); + // VALUES should return unique values only, sorted lexicographically + // The actual values depend on the test data - bool0 contains true/false values + assert response.has("datarows"); + // Verify that we get at most 2 unique boolean values (true/false) + assert response.getJSONArray("datarows").getJSONArray(0).getJSONArray(0).length() <= 2; + } + + @Test + public void testValuesFunctionWithNullValues() throws IOException { + JSONObject response = + executeQuery( + String.format( + "source=%s | head 5 | stats values(int0) as int_values", TEST_INDEX_CALCS)); + verifySchema(response, schema("int_values", "array")); + // Nulls are filtered out by values function + // VALUES returns sorted unique values + verifyDataRows(response, rows(List.of("1", "7"))); + } + + @Test + public void testValuesFunctionGroupBy() throws IOException { + JSONObject response = + executeQuery( + String.format( + "source=%s | head 5 | stats values(num0) as num_values by str0", TEST_INDEX_CALCS)); + verifySchema(response, schema("num_values", "array"), schema("str0", null, "string")); + + // Group by str0 field - should have different groups with their respective unique num0 values + // First 5 rows have: + // - FURNITURE: num0 values are 12.3, -12.3 + // - OFFICE SUPPLIES: num0 values are 15.7, -15.7, 3.5 + // VALUES returns unique values sorted lexicographically as strings + verifyDataRows( + response, + rows(List.of("-12.3", "12.3"), "FURNITURE"), + rows(List.of("-15.7", "15.7", "3.5"), "OFFICE SUPPLIES")); + } + + @Test + public void testValuesFunctionMultipleFields() throws IOException { + JSONObject response = + executeQuery( + String.format( + "source=%s | head 3 | stats values(str2) as str_values, values(int2) as int_values", + TEST_INDEX_CALCS)); + verifySchema(response, schema("str_values", "array"), schema("int_values", "array")); + + // VALUES should return unique sorted values for each field + assert response.has("datarows"); + // Values should be unique and sorted lexicographically + verifyDataRows(response, rows(List.of("one", "three", "two"), List.of("-4", "5"))); + } + + @Test + public void testValuesFunctionWithObjectField() throws IOException { + JSONObject response = + executeQuery( + String.format( + "source=%s | stats values(object_value.first) as object_field_values", + TEST_INDEX_DATATYPE_NONNUMERIC)); + verifySchema(response, schema("object_field_values", "array")); + verifyDataRows(response, rows(List.of("Dale"))); + } + + @Test + public void testValuesFunctionEmptyResult() throws IOException { + JSONObject response = + executeQuery( + String.format( + "source=%s | where str0 = 'NONEXISTENT' | stats values(num0) as empty_values", + TEST_INDEX_CALCS)); + verifySchema(response, schema("empty_values", "array")); + + assert response.has("datarows"); + // When no records match, VALUES returns null (not an empty list) + verifyDataRows(response, rows((List) null)); + } + + @Test + public void testValuesFunctionWithUnlimitedValues() throws IOException { + // This test verifies that when the limit is set to 0 (unlimited), + // all unique values are returned + JSONObject response = + executeQuery( + String.format( + "source=%s | head 100 | stats values(int2) as all_values", TEST_INDEX_CALCS)); + verifySchema(response, schema("all_values", "array")); + + // With the default setting of 0 (unlimited), all unique values should be returned + // The actual number depends on the test data + assert response.has("datarows"); + JSONArray rows = response.getJSONArray("datarows"); + assertNotNull(rows); + assertTrue(rows.length() > 0); + } + + @Test + public void testValuesFunctionRespectsConfiguredLimit() throws IOException, InterruptedException { + // Test 1: Set limit to 3 and verify only 3 values are returned + updateClusterSettings(new ClusterSetting(TRANSIENT, "plugins.ppl.values.max.limit", "3")); + + // Wait a moment for the setting to propagate + Thread.sleep(1000); + + JSONObject response = + executeQuery( + String.format("source=%s | stats values(int2) as limited_values", TEST_INDEX_CALCS)); + verifySchema(response, schema("limited_values", "array")); + + assert response.has("datarows"); + JSONArray rows = response.getJSONArray("datarows"); + assertNotNull(rows); + assertTrue(rows.length() > 0); + + if (!rows.isNull(0)) { + JSONArray values = rows.getJSONArray(0).getJSONArray(0); + assertNotNull(values); + // With limit set to 3, should have at most 3 values + assertTrue( + "Expected at most 3 values with limit=3, but got " + values.length() + ": " + values, + values.length() <= 3); + + // Verify values are in lexicographical order + for (int i = 1; i < values.length(); i++) { + String prev = values.getString(i - 1); + String curr = values.getString(i); + assertTrue(prev.compareTo(curr) <= 0); + } + } + + // Test 2: Set limit to 0 (unlimited) and verify more values are returned + updateClusterSettings(new ClusterSetting(TRANSIENT, "plugins.ppl.values.max.limit", "0")); + + response = + executeQuery( + String.format("source=%s | stats values(int2) as unlimited_values", TEST_INDEX_CALCS)); + verifySchema(response, schema("unlimited_values", "array")); + + rows = response.getJSONArray("datarows"); + assertNotNull(rows); + + if (!rows.isNull(0)) { + JSONArray unlimitedValues = rows.getJSONArray(0).getJSONArray(0); + assertNotNull(unlimitedValues); + // With limit 0 (unlimited), should have all unique values from the dataset + // The test data has more than 3 unique values, so this should be > 3 + assertTrue( + "Expected more than 3 values with unlimited setting, but got " + unlimitedValues.length(), + unlimitedValues.length() > 3); + } + + // Reset the setting to default + updateClusterSettings(new ClusterSetting(TRANSIENT, "plugins.ppl.values.max.limit", null)); + } } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_values_aggregation.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_values_aggregation.json new file mode 100644 index 00000000000..1dd0bc574f0 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_values_aggregation.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], age_values=[VALUES($0)])\n LogicalProject(age=[$8])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableAggregate(group=[{}], age_values=[VALUES($0)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_values_aggregation.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_values_aggregation.json new file mode 100644 index 00000000000..7fc9ba40c61 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_values_aggregation.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], age_values=[VALUES($0)])\n LogicalProject(age=[$8])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableAggregate(group=[{}], age_values=[VALUES($8)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n" + } +} \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 53da492ba4d..14f5b099d9f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -114,6 +114,14 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting PPL_VALUES_MAX_LIMIT_SETTING = + Setting.intSetting( + Key.PPL_VALUES_MAX_LIMIT.getKeyValue(), + 0, + 0, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting CALCITE_ENGINE_ENABLED_SETTING = Setting.boolSetting( Key.CALCITE_ENGINE_ENABLED.getKeyValue(), @@ -361,6 +369,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.PPL_REX_MAX_MATCH_LIMIT, PPL_REX_MAX_MATCH_LIMIT_SETTING, new Updater(Key.PPL_REX_MAX_MATCH_LIMIT)); + register( + settingBuilder, + clusterSettings, + Key.PPL_VALUES_MAX_LIMIT, + PPL_VALUES_MAX_LIMIT_SETTING, + new Updater(Key.PPL_VALUES_MAX_LIMIT)); register( settingBuilder, clusterSettings, @@ -574,6 +588,7 @@ public static List> pluginSettings() { .add(DEFAULT_PATTERN_MAX_SAMPLE_COUNT_SETTING) .add(DEFAULT_PATTERN_BUFFER_LIMIT_SETTING) .add(PPL_REX_MAX_MATCH_LIMIT_SETTING) + .add(PPL_VALUES_MAX_LIMIT_SETTING) .add(QUERY_MEMORY_LIMIT_SETTING) .add(QUERY_SIZE_LIMIT_SETTING) .add(METRICS_ROLLING_WINDOW_SETTING) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java index 9bf70602e8b..5024d416086 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java @@ -96,6 +96,25 @@ void update() { assertNotEquals(newValue.getBytes(), oldValue.getBytes()); } + @Test + void testPplValuesMaxLimitSetting() { + when(clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING)).thenReturn(ClusterName.DEFAULT); + when(clusterSettings.get(not((eq(ClusterName.CLUSTER_NAME_SETTING))))).thenReturn(null); + OpenSearchSettings settings = new OpenSearchSettings(clusterSettings); + + // Test default value is 0 (unlimited) + Integer defaultLimit = settings.getSettingValue(Settings.Key.PPL_VALUES_MAX_LIMIT); + assertEquals(0, defaultLimit); + + // Test setting update + OpenSearchSettings.Updater updater = settings.new Updater(Settings.Key.PPL_VALUES_MAX_LIMIT); + updater.accept(5000); + + // Test retrieval after update + Integer newLimit = settings.getSettingValue(Settings.Key.PPL_VALUES_MAX_LIMIT); + assertEquals(5000, newLimit); + } + @Test void getSparkExecutionEngineConfigSetting() { // Default is empty string diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 1b8399c3e7f..0bc7b784338 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -648,6 +648,7 @@ statsFunction | PERCENTILE_SHORTCUT LT_PRTHS valueExpression RT_PRTHS # percentileShortcutFunctionCall | (DISTINCT_COUNT | DC | DISTINCT_COUNT_APPROX) LT_PRTHS valueExpression RT_PRTHS # distinctCountFunctionCall | takeAggFunction # takeAggFunctionCall + | valuesAggFunction # valuesAggFunctionCall | percentileApproxFunction # percentileApproxFunctionCall | statsFunctionName LT_PRTHS functionArgs RT_PRTHS # statsFunctionCall ; @@ -676,6 +677,10 @@ takeAggFunction : TAKE LT_PRTHS fieldExpression (COMMA size = integerLiteral)? RT_PRTHS ; +valuesAggFunction + : VALUES LT_PRTHS valueExpression RT_PRTHS + ; + percentileApproxFunction : (PERCENTILE | PERCENTILE_APPROX) LT_PRTHS aggField = valueExpression COMMA percent = numericLiteral (COMMA compression = numericLiteral)? RT_PRTHS diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index b848db7c551..774eb73dff6 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -144,6 +144,10 @@ public AstBuilder(String query, Settings settings) { this.settings = settings; } + public Settings getSettings() { + return settings; + } + @Override public UnresolvedPlan visitQueryStatement(OpenSearchPPLParser.QueryStatementContext ctx) { UnresolvedPlan pplCommand = visit(ctx.pplCommands()); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 79be1ca0e9f..86d76f09890 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -242,10 +242,37 @@ public UnresolvedExpression visitStatsFunctionCall(StatsFunctionCallContext ctx) ctx.statsFunctionName().getText(), ctx.functionArgs().functionArg()); } + @Override + public UnresolvedExpression visitValuesAggFunctionCall( + OpenSearchPPLParser.ValuesAggFunctionCallContext ctx) { + ImmutableList.Builder builder = ImmutableList.builder(); + + // Get limit from settings + int limit = 0; // Default to unlimited + if (astBuilder.getSettings() != null) { + Integer settingValue = + astBuilder + .getSettings() + .getSettingValue(org.opensearch.sql.common.setting.Settings.Key.PPL_VALUES_MAX_LIMIT); + if (settingValue != null) { + limit = settingValue; + } + } + + // Only add limit parameter if it's non-zero (i.e., explicitly configured) + if (limit > 0) { + builder.add(new UnresolvedArgument("limit", AstDSL.intLiteral(limit))); + } + + return new AggregateFunction( + "values", visit(ctx.valuesAggFunction().valueExpression()), builder.build()); + } + private AggregateFunction buildAggregateFunction( String functionName, List args) { List unresolvedArgs = args.stream().map(this::visitFunctionArg).collect(Collectors.toList()); + return new AggregateFunction( functionName, unresolvedArgs.get(0), unresolvedArgs.subList(1, unresolvedArgs.size())); } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEvalTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEvalTest.java index d1acdd168b4..285c084e936 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEvalTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEvalTest.java @@ -469,4 +469,52 @@ public void testListAggregationWithGroupBy() { + "GROUP BY `DEPTNO`"; verifyPPLToSparkSQL(root, expectedSparkSql); } + + @Test + public void testValuesAggregationAlone() { + String ppl = "source=EMP | stats values(DEPTNO)"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalAggregate(group=[{}], values(DEPTNO)=[VALUES($0)])\n" + + " LogicalProject(DEPTNO=[$7])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = "SELECT `VALUES`(`DEPTNO`) `values(DEPTNO)`\n" + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testValuesAggregationWithOtherAgg() { + String ppl = "source=EMP | stats values(DEPTNO), count(DEPTNO)"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalAggregate(group=[{}], values(DEPTNO)=[VALUES($0)], count(DEPTNO)=[COUNT($0)])\n" + + " LogicalProject(DEPTNO=[$7])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `VALUES`(`DEPTNO`) `values(DEPTNO)`, COUNT(`DEPTNO`) `count(DEPTNO)`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testValuesAggregationWithGroupBy() { + String ppl = "source=EMP | stats values(ENAME) by DEPTNO"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(values(ENAME)=[$1], DEPTNO=[$0])\n" + + " LogicalAggregate(group=[{0}], values(ENAME)=[VALUES($1)])\n" + + " LogicalProject(DEPTNO=[$7], ENAME=[$1])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `VALUES`(`ENAME`) `values(ENAME)`, `DEPTNO`\n" + + "FROM `scott`.`EMP`\n" + + "GROUP BY `DEPTNO`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java index 855cd17db68..fc840169ee2 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java @@ -286,6 +286,17 @@ public void testStrftimeWithWrongNumberOfArgs() { + " [INTEGER,STRING,STRING]"); } + // Test VALUES function with array expression (which is not a supported scalar type) + @Test + public void testValuesFunctionWithArrayArgType() { + verifyQueryThrowsException( + "source=EMP | stats values(array(ENAME, JOB)) as unique_values", + "Aggregation function VALUES expects field type" + + " {[BYTE]|[SHORT]|[INTEGER]|[LONG]|[FLOAT]|[DOUBLE]|[STRING]|[BOOLEAN]|[DATE]|[TIME]|[TIMESTAMP]|[IP]|[BINARY]|[BYTE,INTEGER]" + + "|[SHORT,INTEGER]|[INTEGER,INTEGER]|[LONG,INTEGER]|[FLOAT,INTEGER]|[DOUBLE,INTEGER]|[STRING,INTEGER]|[BOOLEAN,INTEGER]|[DATE,INTEGER]|[TIME,INTEGER]|[TIMESTAMP,INTEGER]|[IP,INTEGER]|[BINARY,INTEGER]}," + + " but got [ARRAY]"); + } + // mvjoin should reject non-string single values @Test public void testMvjoinRejectsNonStringValues() {