From 2f9b260331ec4dfcaedc3073aa8240cc6824112c Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Wed, 3 Sep 2025 09:19:55 -0700 Subject: [PATCH 1/3] Pushdown earliest/latest aggregate functions (#4166) * Pushdown earliest/latest aggregate functions Signed-off-by: Tomoyuki Morita * Fix spark sql Signed-off-by: Tomoyuki Morita --------- Signed-off-by: Tomoyuki Morita --- .../calcite/explain_earliest_latest.json | 2 +- .../explain_earliest_latest_custom_time.json | 2 +- .../opensearch/request/AggregateAnalyzer.java | 88 +++++++++++-------- .../response/agg/ArgMaxMinParser.java | 39 ++++++++ .../ppl/calcite/CalcitePPLAbstractTest.java | 5 +- .../calcite/CalcitePPLEarliestLatestTest.java | 14 +-- .../calcite/OpenSearchSparkSqlDialect.java | 58 ++++++++++++ 7 files changed, 157 insertions(+), 51 deletions(-) create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/ArgMaxMinParser.java create mode 100644 ppl/src/test/java/org/opensearch/sql/ppl/calcite/OpenSearchSparkSqlDialect.java diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest.json index 46143e0c429..8f794d560d6 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest.json @@ -1,6 +1,6 @@ { "calcite": { "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(earliest_message=[$1], latest_message=[$2], server=[$0])\n LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n LogicalProject(server=[$1], message=[$3], @timestamp=[$2])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], earliest_message=[$t1], latest_message=[$t2], server=[$t0])\n EnumerableAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[server, message, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"server\",\"message\",\"@timestamp\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], earliest_message=[$t1], latest_message=[$t2], server=[$t0])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},earliest_message=ARG_MIN($1, $2),latest_message=ARG_MAX($1, $2))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"server\":{\"terms\":{\"field\":\"server\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"earliest_message\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"message\"],\"excludes\":[]},\"sort\":[{\"@timestamp\":{\"order\":\"asc\"}}]}},\"latest_message\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"message\"],\"excludes\":[]},\"sort\":[{\"@timestamp\":{\"order\":\"desc\"}}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" } } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest_custom_time.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest_custom_time.json index a3d2d3a634c..d4bce620b6c 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest_custom_time.json +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest_custom_time.json @@ -1,6 +1,6 @@ { "calcite": { "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(earliest_message=[$1], latest_message=[$2], level=[$0])\n LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n LogicalProject(level=[$4], message=[$3], created_at=[$0])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", - "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], earliest_message=[$t1], latest_message=[$t2], level=[$t0])\n EnumerableAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[level, message, created_at]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"level\",\"message\",\"created_at\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], earliest_message=[$t1], latest_message=[$t2], level=[$t0])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},earliest_message=ARG_MIN($1, $2),latest_message=ARG_MAX($1, $2))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"level\":{\"terms\":{\"field\":\"level\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"earliest_message\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"message\"],\"excludes\":[]},\"sort\":[{\"created_at\":{\"order\":\"asc\"}}]}},\"latest_message\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"message\"],\"excludes\":[]},\"sort\":[{\"created_at\":{\"order\":\"desc\"}}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java index f0e60362ba1..9476a61b569 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java @@ -69,6 +69,7 @@ import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.opensearch.request.PredicateAnalyzer.NamedFieldExpression; +import org.opensearch.sql.opensearch.response.agg.ArgMaxMinParser; import org.opensearch.sql.opensearch.response.agg.CompositeAggregationParser; import org.opensearch.sql.opensearch.response.agg.MetricParser; import org.opensearch.sql.opensearch.response.agg.NoBucketAggregationParser; @@ -267,45 +268,54 @@ private static Pair createRegularAggregation( String aggFieldName, AggregateBuilderHelper helper) { - switch (aggCall.getAggregation().kind) { - case AVG: - return Pair.of( - helper.build(args.get(0), AggregationBuilders.avg(aggFieldName)), - new SingleValueParser(aggFieldName)); - case SUM: - return Pair.of( - helper.build(args.get(0), AggregationBuilders.sum(aggFieldName)), - new SingleValueParser(aggFieldName)); - case COUNT: - return Pair.of( - helper.build( - !args.isEmpty() ? args.get(0) : null, AggregationBuilders.count(aggFieldName)), - new SingleValueParser(aggFieldName)); - case MIN: - return Pair.of( - helper.build(args.get(0), AggregationBuilders.min(aggFieldName)), - new SingleValueParser(aggFieldName)); - case MAX: - return Pair.of( - helper.build(args.get(0), AggregationBuilders.max(aggFieldName)), - new SingleValueParser(aggFieldName)); - case VAR_SAMP: - return Pair.of( - helper.build(args.get(0), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getVarianceSampling, aggFieldName)); - case VAR_POP: - return Pair.of( - helper.build(args.get(0), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getVariancePopulation, aggFieldName)); - case STDDEV_SAMP: - return Pair.of( - helper.build(args.get(0), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getStdDeviationSampling, aggFieldName)); - case STDDEV_POP: - return Pair.of( - helper.build(args.get(0), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getStdDeviationPopulation, aggFieldName)); - case OTHER_FUNCTION: + return switch (aggCall.getAggregation().kind) { + case AVG -> Pair.of( + helper.build(args.getFirst(), AggregationBuilders.avg(aggFieldName)), + new SingleValueParser(aggFieldName)); + case SUM -> Pair.of( + helper.build(args.getFirst(), AggregationBuilders.sum(aggFieldName)), + new SingleValueParser(aggFieldName)); + case COUNT -> Pair.of( + helper.build( + !args.isEmpty() ? args.getFirst() : null, AggregationBuilders.count(aggFieldName)), + new SingleValueParser(aggFieldName)); + case MIN -> Pair.of( + helper.build(args.getFirst(), AggregationBuilders.min(aggFieldName)), + new SingleValueParser(aggFieldName)); + case MAX -> Pair.of( + helper.build(args.getFirst(), AggregationBuilders.max(aggFieldName)), + new SingleValueParser(aggFieldName)); + case VAR_SAMP -> Pair.of( + helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)), + new StatsParser(ExtendedStats::getVarianceSampling, aggFieldName)); + case VAR_POP -> Pair.of( + helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)), + new StatsParser(ExtendedStats::getVariancePopulation, aggFieldName)); + case STDDEV_SAMP -> Pair.of( + helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)), + new StatsParser(ExtendedStats::getStdDeviationSampling, aggFieldName)); + case STDDEV_POP -> Pair.of( + helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)), + new StatsParser(ExtendedStats::getStdDeviationPopulation, aggFieldName)); + case ARG_MAX -> Pair.of( + AggregationBuilders.topHits(aggFieldName) + .fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null) + .size(1) + .from(0) + .sort( + helper.inferNamedField(args.get(1)).getRootName(), + org.opensearch.search.sort.SortOrder.DESC), + new ArgMaxMinParser(aggFieldName)); + case ARG_MIN -> Pair.of( + AggregationBuilders.topHits(aggFieldName) + .fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null) + .size(1) + .from(0) + .sort( + helper.inferNamedField(args.get(1)).getRootName(), + org.opensearch.search.sort.SortOrder.ASC), + new ArgMaxMinParser(aggFieldName)); + case OTHER_FUNCTION -> { BuiltinFunctionName functionName = BuiltinFunctionName.ofAggregation(aggCall.getAggregation().getName()).get(); switch (functionName) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/ArgMaxMinParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/ArgMaxMinParser.java new file mode 100644 index 00000000000..2ff1e511713 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/ArgMaxMinParser.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.response.agg; + +import java.util.Collections; +import java.util.Map; +import lombok.Value; +import org.opensearch.search.SearchHit; +import org.opensearch.search.aggregations.Aggregation; +import org.opensearch.search.aggregations.metrics.TopHits; + +/** {@link TopHits} metric parser for ARG_MAX/ARG_MIN aggregations. */ +@Value +public class ArgMaxMinParser implements MetricParser { + + String name; + + @Override + public Map parse(Aggregation agg) { + TopHits topHits = (TopHits) agg; + SearchHit[] hits = topHits.getHits().getHits(); + + if (hits.length == 0) { + return Collections.singletonMap(agg.getName(), null); + } + + Map source = hits[0].getSourceAsMap(); + + if (source.isEmpty()) { + return Collections.singletonMap(agg.getName(), null); + } else { + Object value = source.values().iterator().next(); + return Collections.singletonMap(agg.getName(), value); + } + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java index f40e90ff0e6..652d6e77e3a 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java @@ -26,7 +26,6 @@ import org.apache.calcite.rel.rel2sql.SqlImplementor; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.dialect.SparkSqlDialect; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.test.CalciteAssert; import org.apache.calcite.tools.Frameworks; @@ -53,7 +52,7 @@ public class CalcitePPLAbstractTest { public CalcitePPLAbstractTest(CalciteAssert.SchemaSpec... schemaSpecs) { this.config = config(schemaSpecs); this.planTransformer = new CalciteRelNodeVisitor(); - this.converter = new RelToSqlConverter(SparkSqlDialect.DEFAULT); + this.converter = new RelToSqlConverter(OpenSearchSparkSqlDialect.DEFAULT); this.settings = mock(Settings.class); } @@ -160,7 +159,7 @@ public void verifyPPLToSparkSQL(RelNode rel, String expected) { String normalized = expected.replace("\n", System.lineSeparator()); SqlImplementor.Result result = converter.visitRoot(rel); final SqlNode sqlNode = result.asStatement(); - final String sql = sqlNode.toSqlString(SparkSqlDialect.DEFAULT).getSql(); + final String sql = sqlNode.toSqlString(OpenSearchSparkSqlDialect.DEFAULT).getSql(); assertThat(sql, is(normalized)); } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java index 16a57bacb57..c9c260cd444 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java @@ -107,7 +107,7 @@ public void testEarliestWithoutSecondArgument() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT ARG_MIN(`message`, `@timestamp`) `earliest_message`\n" + "FROM `POST`.`LOGS`"; + "SELECT MIN_BY (`message`, `@timestamp`) `earliest_message`\n" + "FROM `POST`.`LOGS`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -125,7 +125,7 @@ public void testLatestWithoutSecondArgument() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT ARG_MAX(`message`, `@timestamp`) `latest_message`\n" + "FROM `POST`.`LOGS`"; + "SELECT MAX_BY (`message`, `@timestamp`) `latest_message`\n" + "FROM `POST`.`LOGS`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -147,7 +147,7 @@ public void testEarliestByServerWithoutSecondArgument() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT ARG_MIN(`message`, `@timestamp`) `earliest_message`, `server`\n" + "SELECT MIN_BY (`message`, `@timestamp`) `earliest_message`, `server`\n" + "FROM `POST`.`LOGS`\n" + "GROUP BY `server`"; verifyPPLToSparkSQL(root, expectedSparkSql); @@ -171,7 +171,7 @@ public void testLatestByServerWithoutSecondArgument() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT ARG_MAX(`message`, `@timestamp`) `latest_message`, `server`\n" + "SELECT MAX_BY (`message`, `@timestamp`) `latest_message`, `server`\n" + "FROM `POST`.`LOGS`\n" + "GROUP BY `server`"; verifyPPLToSparkSQL(root, expectedSparkSql); @@ -196,7 +196,7 @@ public void testEarliestWithOtherAggregatesWithoutSecondArgument() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT ARG_MIN(`message`, `@timestamp`) `earliest_message`, " + "SELECT MIN_BY (`message`, `@timestamp`) `earliest_message`, " + "COUNT(*) `cnt`, `server`\n" + "FROM `POST`.`LOGS`\n" + "GROUP BY `server`"; @@ -217,7 +217,7 @@ public void testEarliestWithExplicitTimestampField() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT ARG_MIN(`message`, `created_at`) `earliest_message`\n" + "FROM `POST`.`LOGS`"; + "SELECT MIN_BY (`message`, `created_at`) `earliest_message`\n" + "FROM `POST`.`LOGS`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -235,7 +235,7 @@ public void testLatestWithExplicitTimestampField() { verifyResult(root, expectedResult); String expectedSparkSql = - "SELECT ARG_MAX(`message`, `created_at`) `latest_message`\n" + "FROM `POST`.`LOGS`"; + "SELECT MAX_BY (`message`, `created_at`) `latest_message`\n" + "FROM `POST`.`LOGS`"; verifyPPLToSparkSQL(root, expectedSparkSql); } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/OpenSearchSparkSqlDialect.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/OpenSearchSparkSqlDialect.java new file mode 100644 index 00000000000..24ddedd2562 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/OpenSearchSparkSqlDialect.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.dialect.SparkSqlDialect; + +/** + * Custom Spark SQL dialect that extends Calcite's SparkSqlDialect to handle OpenSearch-specific + * function translations. This dialect ensures that functions are translated to their correct Spark + * SQL equivalents. + */ +public class OpenSearchSparkSqlDialect extends SparkSqlDialect { + + /** Singleton instance of the OpenSearch Spark SQL dialect. */ + public static final OpenSearchSparkSqlDialect DEFAULT = new OpenSearchSparkSqlDialect(); + + private static final Map CALCITE_TO_SPARK_MAPPING = + ImmutableMap.of( + "ARG_MIN", "MIN_BY", + "ARG_MAX", "MAX_BY"); + + private OpenSearchSparkSqlDialect() { + super(DEFAULT_CONTEXT); + } + + @Override + public void unparseCall(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) { + String operatorName = call.getOperator().getName(); + + // Replace Calcite specific functions with their Spark SQL equivalents + if (CALCITE_TO_SPARK_MAPPING.containsKey(operatorName)) { + unparseFunction( + writer, call, CALCITE_TO_SPARK_MAPPING.get(operatorName), leftPrec, rightPrec); + } else { + super.unparseCall(writer, call, leftPrec, rightPrec); + } + } + + private void unparseFunction( + SqlWriter writer, SqlCall call, String functionName, int leftPrec, int rightPrec) { + writer.keyword(functionName); + final SqlWriter.Frame frame = writer.startList("(", ")"); + for (int i = 0; i < call.operandCount(); i++) { + if (i > 0) { + writer.sep(","); + } + call.operand(i).unparse(writer, leftPrec, rightPrec); + } + writer.endList(frame); + } +} From 8197bf7997a8ed6c5de002445d60e39df4a11e50 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Thu, 4 Sep 2025 10:31:15 -0700 Subject: [PATCH 2/3] Fix for Java 11 Signed-off-by: Tomoyuki Morita --- .../opensearch/request/AggregateAnalyzer.java | 75 +++++++++++-------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java index 9476a61b569..08afeb7720d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java @@ -268,36 +268,46 @@ private static Pair createRegularAggregation( String aggFieldName, AggregateBuilderHelper helper) { - return switch (aggCall.getAggregation().kind) { - case AVG -> Pair.of( - helper.build(args.getFirst(), AggregationBuilders.avg(aggFieldName)), - new SingleValueParser(aggFieldName)); - case SUM -> Pair.of( - helper.build(args.getFirst(), AggregationBuilders.sum(aggFieldName)), - new SingleValueParser(aggFieldName)); - case COUNT -> Pair.of( - helper.build( - !args.isEmpty() ? args.getFirst() : null, AggregationBuilders.count(aggFieldName)), - new SingleValueParser(aggFieldName)); - case MIN -> Pair.of( - helper.build(args.getFirst(), AggregationBuilders.min(aggFieldName)), - new SingleValueParser(aggFieldName)); - case MAX -> Pair.of( - helper.build(args.getFirst(), AggregationBuilders.max(aggFieldName)), - new SingleValueParser(aggFieldName)); - case VAR_SAMP -> Pair.of( - helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getVarianceSampling, aggFieldName)); - case VAR_POP -> Pair.of( - helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getVariancePopulation, aggFieldName)); - case STDDEV_SAMP -> Pair.of( - helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getStdDeviationSampling, aggFieldName)); - case STDDEV_POP -> Pair.of( - helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)), - new StatsParser(ExtendedStats::getStdDeviationPopulation, aggFieldName)); - case ARG_MAX -> Pair.of( + switch (aggCall.getAggregation().kind) { + case AVG: + return Pair.of( + helper.build(args.get(0), AggregationBuilders.avg(aggFieldName)), + new SingleValueParser(aggFieldName)); + case SUM: + return Pair.of( + helper.build(args.get(0), AggregationBuilders.sum(aggFieldName)), + new SingleValueParser(aggFieldName)); + case COUNT: + return Pair.of( + helper.build( + !args.isEmpty() ? args.get(0) : null, AggregationBuilders.count(aggFieldName)), + new SingleValueParser(aggFieldName)); + case MIN: + return Pair.of( + helper.build(args.get(0), AggregationBuilders.min(aggFieldName)), + new SingleValueParser(aggFieldName)); + case MAX: + return Pair.of( + helper.build(args.get(0), AggregationBuilders.max(aggFieldName)), + new SingleValueParser(aggFieldName)); + case VAR_SAMP: + return Pair.of( + helper.build(args.get(0), AggregationBuilders.extendedStats(aggFieldName)), + new StatsParser(ExtendedStats::getVarianceSampling, aggFieldName)); + case VAR_POP: + return Pair.of( + helper.build(args.get(0), AggregationBuilders.extendedStats(aggFieldName)), + new StatsParser(ExtendedStats::getVariancePopulation, aggFieldName)); + case STDDEV_SAMP: + return Pair.of( + helper.build(args.get(0), AggregationBuilders.extendedStats(aggFieldName)), + new StatsParser(ExtendedStats::getStdDeviationSampling, aggFieldName)); + case STDDEV_POP: + return Pair.of( + helper.build(args.get(0), AggregationBuilders.extendedStats(aggFieldName)), + new StatsParser(ExtendedStats::getStdDeviationPopulation, aggFieldName)); + case ARG_MAX: + return Pair.of( AggregationBuilders.topHits(aggFieldName) .fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null) .size(1) @@ -306,7 +316,8 @@ private static Pair createRegularAggregation( helper.inferNamedField(args.get(1)).getRootName(), org.opensearch.search.sort.SortOrder.DESC), new ArgMaxMinParser(aggFieldName)); - case ARG_MIN -> Pair.of( + case ARG_MIN: + return Pair.of( AggregationBuilders.topHits(aggFieldName) .fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null) .size(1) @@ -315,7 +326,7 @@ private static Pair createRegularAggregation( helper.inferNamedField(args.get(1)).getRootName(), org.opensearch.search.sort.SortOrder.ASC), new ArgMaxMinParser(aggFieldName)); - case OTHER_FUNCTION -> { + case OTHER_FUNCTION: BuiltinFunctionName functionName = BuiltinFunctionName.ofAggregation(aggCall.getAggregation().getName()).get(); switch (functionName) { From daac9c98251b9b5c165e8a9f408215902dd16d15 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Thu, 4 Sep 2025 10:32:40 -0700 Subject: [PATCH 3/3] Fix for Java 11 Signed-off-by: Tomoyuki Morita --- .../opensearch/sql/opensearch/request/AggregateAnalyzer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java index 08afeb7720d..c746c7cb857 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java @@ -309,7 +309,7 @@ private static Pair createRegularAggregation( case ARG_MAX: return Pair.of( AggregationBuilders.topHits(aggFieldName) - .fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null) + .fetchSource(helper.inferNamedField(args.get(0)).getRootName(), null) .size(1) .from(0) .sort( @@ -319,7 +319,7 @@ private static Pair createRegularAggregation( case ARG_MIN: return Pair.of( AggregationBuilders.topHits(aggFieldName) - .fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null) + .fetchSource(helper.inferNamedField(args.get(0)).getRootName(), null) .size(1) .from(0) .sort(