-
Notifications
You must be signed in to change notification settings - Fork 181
Add non-numeric field support for max/min functions #4281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
db99822
2201a96
87e5b6a
2580bcd
de51eb1
987809b
864e379
83b5f50
5aea131
6dbf487
1fbd3fb
67d23df
a648c2e
c3080d8
16edabe
d2200e8
e692b9d
27fe823
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "calcite": { | ||
| "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], max(firstname)=[MAX($0)])\n LogicalProject(firstname=[$1])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", | ||
| "physical": "EnumerableLimit(fetch=[10000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},max(firstname)=MAX($0))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"max(firstname)\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"firstname\"],\"excludes\":[]},\"sort\":[{\"firstname.keyword\":{\"order\":\"desc\"}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "calcite": { | ||
| "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], min(firstname)=[MIN($0)])\n LogicalProject(firstname=[$1])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", | ||
| "physical": "EnumerableLimit(fetch=[10000])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},min(firstname)=MIN($0))], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"min(firstname)\":{\"top_hits\":{\"from\":0,\"size\":1,\"version\":false,\"seq_no_primary_term\":false,\"explain\":false,\"_source\":{\"includes\":[\"firstname\"],\"excludes\":[]},\"sort\":[{\"firstname.keyword\":{\"order\":\"asc\"}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "calcite": { | ||
| "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], max(firstname)=[MAX($0)])\n LogicalProject(firstname=[$1])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", | ||
| "physical": "EnumerableLimit(fetch=[10000])\n EnumerableAggregate(group=[{}], max(firstname)=[MAX($1)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n" | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "calcite": { | ||
| "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalAggregate(group=[{}], min(firstname)=[MIN($0)])\n LogicalProject(firstname=[$1])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", | ||
| "physical": "EnumerableLimit(fetch=[10000])\n EnumerableAggregate(group=[{}], min(firstname)=[MIN($1)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n" | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| { | ||
dai-chen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "root": { | ||
| "type": "OpenSearchPlan", | ||
| "children": [ | ||
| { | ||
| "type": "LogicalAggregate", | ||
| "description": "group=[{}], max(firstname)=[MAX($0)]", | ||
| "children": [ | ||
| { | ||
| "type": "LogicalProject", | ||
| "description": "firstname=[$3]", | ||
| "children": [ | ||
| { | ||
| "type": "LogicalTableScan", | ||
| "description": "table=[[opensearch-sql_test_index_account]]" | ||
| } | ||
| ] | ||
| } | ||
| ] | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| { | ||
| "root": { | ||
| "type": "OpenSearchPlan", | ||
| "children": [ | ||
| { | ||
| "type": "LogicalAggregate", | ||
| "description": "group=[{}], min(firstname)=[MIN($0)]", | ||
| "children": [ | ||
| { | ||
| "type": "LogicalProject", | ||
| "description": "firstname=[$3]", | ||
| "children": [ | ||
| { | ||
| "type": "LogicalTableScan", | ||
| "description": "table=[[opensearch-sql_test_index_account]]" | ||
| } | ||
| ] | ||
| } | ||
| ] | ||
| } | ||
| ] | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,12 +69,15 @@ | |
| import org.opensearch.sql.ast.expression.Argument; | ||
| import org.opensearch.sql.ast.expression.SpanUnit; | ||
| import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; | ||
| import org.opensearch.sql.data.type.ExprCoreType; | ||
| import org.opensearch.sql.data.type.ExprType; | ||
| import org.opensearch.sql.expression.function.BuiltinFunctionName; | ||
| import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; | ||
| import org.opensearch.sql.opensearch.request.PredicateAnalyzer.NamedFieldExpression; | ||
| import org.opensearch.sql.opensearch.response.agg.ArgMaxMinParser; | ||
| import org.opensearch.sql.opensearch.response.agg.BucketAggregationParser; | ||
| import org.opensearch.sql.opensearch.response.agg.CompositeAggregationParser; | ||
| import org.opensearch.sql.opensearch.response.agg.MaxMinParser; | ||
| import org.opensearch.sql.opensearch.response.agg.MetricParser; | ||
| import org.opensearch.sql.opensearch.response.agg.NoBucketAggregationParser; | ||
| import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; | ||
|
|
@@ -298,12 +301,46 @@ private static Pair<AggregationBuilder, MetricParser> createRegularAggregation( | |
| helper.build( | ||
| !args.isEmpty() ? args.getFirst() : null, AggregationBuilders.count(aggFieldName)), | ||
| new SingleValueParser(aggFieldName)); | ||
| case MIN -> Pair.of( | ||
| helper.build(args.getFirst(), AggregationBuilders.min(aggFieldName)), | ||
| new SingleValueParser(aggFieldName)); | ||
| case MAX -> Pair.of( | ||
| helper.build(args.getFirst(), AggregationBuilders.max(aggFieldName)), | ||
| new SingleValueParser(aggFieldName)); | ||
| case MIN -> { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. MIN and MAX logic seems same other than AggregationBuilders.min/max and ASC/DESC. Can we extract as a method for maintainability?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup makes sense will add this and using |
||
| String fieldName = helper.inferNamedField(args.getFirst()).getRootName(); | ||
| ExprType fieldType = helper.fieldTypes.get(fieldName); | ||
|
|
||
| if (isStringType(fieldType)) { | ||
| yield Pair.of( | ||
| AggregationBuilders.topHits(aggFieldName) | ||
| .fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null) | ||
| .size(1) | ||
| .from(0) | ||
| .sort( | ||
| helper.inferNamedField(args.getFirst()).getReferenceForTermQuery(), | ||
| SortOrder.ASC), | ||
| new MaxMinParser(aggFieldName)); | ||
dai-chen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } else { | ||
| yield Pair.of( | ||
| helper.build(args.getFirst(), AggregationBuilders.min(aggFieldName)), | ||
| new SingleValueParser(aggFieldName)); | ||
| } | ||
| } | ||
| case MAX -> { | ||
| String fieldName = helper.inferNamedField(args.getFirst()).getRootName(); | ||
| ExprType fieldType = helper.fieldTypes.get(fieldName); | ||
|
|
||
| if (isStringType(fieldType)) { | ||
| yield Pair.of( | ||
| AggregationBuilders.topHits(aggFieldName) | ||
| .fetchSource(helper.inferNamedField(args.getFirst()).getRootName(), null) | ||
| .size(1) | ||
| .from(0) | ||
| .sort( | ||
| helper.inferNamedField(args.getFirst()).getReferenceForTermQuery(), | ||
| SortOrder.DESC), | ||
| new MaxMinParser(aggFieldName)); | ||
| } else { | ||
| yield Pair.of( | ||
| helper.build(args.getFirst(), AggregationBuilders.max(aggFieldName)), | ||
| new SingleValueParser(aggFieldName)); | ||
| } | ||
| } | ||
| case VAR_SAMP -> Pair.of( | ||
| helper.build(args.getFirst(), AggregationBuilders.extendedStats(aggFieldName)), | ||
| new StatsParser(ExtendedStats::getVarianceSampling, aggFieldName)); | ||
|
|
@@ -383,6 +420,10 @@ yield switch (functionName) { | |
| }; | ||
| } | ||
|
|
||
| private static boolean isStringType(ExprType fieldType) { | ||
| return fieldType instanceof OpenSearchTextType || fieldType == ExprCoreType.STRING; | ||
| } | ||
dai-chen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| private static ValuesSourceAggregationBuilder<?> createBucketAggregation( | ||
| Integer group, Project project, AggregateAnalyzer.AggregateBuilderHelper helper) { | ||
| return createBucket(group, project, helper); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.opensearch.response.agg; | ||
|
|
||
| import java.util.Collections; | ||
| import java.util.Map; | ||
| import lombok.Value; | ||
| import org.opensearch.search.SearchHit; | ||
| import org.opensearch.search.aggregations.Aggregation; | ||
| import org.opensearch.search.aggregations.metrics.TopHits; | ||
|
|
||
| /** {@link TopHits} metric parser for MAX/MIN aggregations on text fields. */ | ||
| @Value | ||
| public class MaxMinParser implements MetricParser { | ||
dai-chen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| String name; | ||
|
|
||
| @Override | ||
| public Map<String, Object> parse(Aggregation agg) { | ||
| TopHits topHits = (TopHits) agg; | ||
| SearchHit[] hits = topHits.getHits().getHits(); | ||
|
|
||
| if (hits.length == 0) { | ||
| return Collections.singletonMap(agg.getName(), null); | ||
| } | ||
|
|
||
| Map<String, Object> source = hits[0].getSourceAsMap(); | ||
|
|
||
| if (source.isEmpty()) { | ||
| return Collections.singletonMap(agg.getName(), null); | ||
| } else { | ||
| Object value = source.values().iterator().next(); | ||
| // Convert all values to strings to handle mixed types consistently with lexicographical | ||
| // sorting | ||
| String stringValue = value != null ? value.toString() : null; | ||
| return Collections.singletonMap(agg.getName(), stringValue); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can use assertYamlEqualsJsonIgnoreId (refer #4274) Calcite plan readability.