Skip to content

Commit b39e7b6

Browse files
authored
Support construct AggregationResponseParser during Aggregator build stage (#108)
* Support construct AggregationResponseParser during Aggregator build stage * modify the doc Signed-off-by: penghuo <[email protected]>
1 parent 94a059a commit b39e7b6

21 files changed

+650
-253
lines changed

opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
import java.util.List;
6464
import java.util.Map;
6565
import java.util.function.Function;
66-
import lombok.AllArgsConstructor;
66+
import lombok.Getter;
6767
import lombok.Setter;
6868
import org.opensearch.common.time.DateFormatters;
6969
import org.opensearch.sql.data.model.ExprBooleanValue;
@@ -86,18 +86,22 @@
8686
import org.opensearch.sql.opensearch.data.utils.Content;
8787
import org.opensearch.sql.opensearch.data.utils.ObjectContent;
8888
import org.opensearch.sql.opensearch.data.utils.OpenSearchJsonContent;
89+
import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser;
8990

9091
/**
9192
* Construct ExprValue from OpenSearch response.
9293
*/
93-
@AllArgsConstructor
9494
public class OpenSearchExprValueFactory {
9595
/**
9696
* The Mapping of Field and ExprType.
9797
*/
9898
@Setter
9999
private Map<String, ExprType> typeMapping;
100100

101+
@Getter
102+
@Setter
103+
private OpenSearchAggregationResponseParser parser;
104+
101105
private static final DateTimeFormatter DATE_TIME_FORMATTER =
102106
new DateTimeFormatterBuilder()
103107
.appendOptional(SQL_LITERAL_DATE_TIME_FORMAT)
@@ -131,6 +135,14 @@ public class OpenSearchExprValueFactory {
131135
.put(OPENSEARCH_BINARY, c -> new OpenSearchExprBinaryValue(c.stringValue()))
132136
.build();
133137

138+
/**
139+
* Constructor of OpenSearchExprValueFactory.
140+
*/
141+
public OpenSearchExprValueFactory(
142+
Map<String, ExprType> typeMapping) {
143+
this.typeMapping = typeMapping;
144+
}
145+
134146
/**
135147
* The struct construction has the following assumption. 1. The field has OpenSearch Object
136148
* data type. https://www.elastic.co/guide/en/elasticsearch/reference/current/object.html 2. The

opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchAggregationResponseParser.java

Lines changed: 0 additions & 114 deletions
This file was deleted.

opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public boolean isAggregationResponse() {
103103
*/
104104
public Iterator<ExprValue> iterator() {
105105
if (isAggregationResponse()) {
106-
return OpenSearchAggregationResponseParser.parse(aggregations).stream().map(entry -> {
106+
return exprValueFactory.getParser().parse(aggregations).stream().map(entry -> {
107107
ImmutableMap.Builder<String, ExprValue> builder = new ImmutableMap.Builder<>();
108108
for (Map.Entry<String, Object> value : entry.entrySet()) {
109109
builder.put(value.getKey(), exprValueFactory.construct(value.getKey(), value.getValue()));
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License").
3+
* You may not use this file except in compliance with the License.
4+
* A copy of the License is located at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* or in the "license" file accompanying this file. This file is distributed
9+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
10+
* express or implied. See the License for the specific language governing
11+
* permissions and limitations under the License.
12+
*/
13+
14+
package org.opensearch.sql.opensearch.response.agg;
15+
16+
import java.util.Arrays;
17+
import java.util.HashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.stream.Collectors;
21+
import org.opensearch.search.aggregations.Aggregations;
22+
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation;
23+
24+
/**
25+
* Composite Aggregation Parser which include composite aggregation and metric parsers.
26+
*/
27+
public class CompositeAggregationParser implements OpenSearchAggregationResponseParser {
28+
29+
private final MetricParserHelper metricsParser;
30+
31+
public CompositeAggregationParser(MetricParser... metricParserList) {
32+
metricsParser = new MetricParserHelper(Arrays.asList(metricParserList));
33+
}
34+
35+
public CompositeAggregationParser(List<MetricParser> metricParserList) {
36+
metricsParser = new MetricParserHelper(metricParserList);
37+
}
38+
39+
@Override
40+
public List<Map<String, Object>> parse(Aggregations aggregations) {
41+
return ((CompositeAggregation) aggregations.asList().get(0))
42+
.getBuckets().stream().map(this::parse).collect(Collectors.toList());
43+
}
44+
45+
private Map<String, Object> parse(CompositeAggregation.Bucket bucket) {
46+
Map<String, Object> resultMap = new HashMap<>();
47+
resultMap.putAll(bucket.getKey());
48+
resultMap.putAll(metricsParser.parse(bucket.getAggregations()));
49+
return resultMap;
50+
}
51+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License").
3+
* You may not use this file except in compliance with the License.
4+
* A copy of the License is located at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* or in the "license" file accompanying this file. This file is distributed
9+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
10+
* express or implied. See the License for the specific language governing
11+
* permissions and limitations under the License.
12+
*/
13+
14+
package org.opensearch.sql.opensearch.response.agg;
15+
16+
import java.util.Map;
17+
import lombok.Builder;
18+
import lombok.Getter;
19+
import org.opensearch.search.aggregations.Aggregation;
20+
import org.opensearch.search.aggregations.bucket.filter.Filter;
21+
22+
/**
23+
* {@link Filter} Parser.
24+
* The current use case is filter aggregation, e.g. avg(age) filter(balance>0). The filter parser
25+
* do nothing and return the result from metricsParser.
26+
*/
27+
@Builder
28+
public class FilterParser implements MetricParser {
29+
30+
private final MetricParser metricsParser;
31+
32+
@Getter private final String name;
33+
34+
@Override
35+
public Map<String, Object> parse(Aggregation aggregations) {
36+
return metricsParser.parse(((Filter) aggregations).getAggregations().asList().get(0));
37+
}
38+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License").
3+
* You may not use this file except in compliance with the License.
4+
* A copy of the License is located at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* or in the "license" file accompanying this file. This file is distributed
9+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
10+
* express or implied. See the License for the specific language governing
11+
* permissions and limitations under the License.
12+
*/
13+
14+
package org.opensearch.sql.opensearch.response.agg;
15+
16+
import java.util.Map;
17+
import org.opensearch.search.aggregations.Aggregation;
18+
19+
/**
20+
* Metric Aggregation Parser.
21+
*/
22+
public interface MetricParser {
23+
24+
/**
25+
* Get the name of metric parser.
26+
*/
27+
String getName();
28+
29+
/**
30+
* Parse the {@link Aggregation}.
31+
*
32+
* @param aggregation {@link Aggregation}
33+
* @return the map between metric name and metric value.
34+
*/
35+
Map<String, Object> parse(Aggregation aggregation);
36+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License").
3+
* You may not use this file except in compliance with the License.
4+
* A copy of the License is located at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* or in the "license" file accompanying this file. This file is distributed
9+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
10+
* express or implied. See the License for the specific language governing
11+
* permissions and limitations under the License.
12+
*/
13+
14+
package org.opensearch.sql.opensearch.response.agg;
15+
16+
import java.util.HashMap;
17+
import java.util.List;
18+
import java.util.Map;
19+
import java.util.stream.Collectors;
20+
import lombok.RequiredArgsConstructor;
21+
import org.opensearch.search.aggregations.Aggregation;
22+
import org.opensearch.search.aggregations.Aggregations;
23+
import org.opensearch.sql.common.utils.StringUtils;
24+
25+
/**
26+
* Parse multiple metrics in one bucket.
27+
*/
28+
@RequiredArgsConstructor
29+
public class MetricParserHelper {
30+
31+
private final Map<String, MetricParser> metricParserMap;
32+
33+
public MetricParserHelper(List<MetricParser> metricParserList) {
34+
metricParserMap =
35+
metricParserList.stream().collect(Collectors.toMap(MetricParser::getName, m -> m));
36+
}
37+
38+
/**
39+
* Parse {@link Aggregations}.
40+
*
41+
* @param aggregations {@link Aggregations}
42+
* @return the map between metric name and metric value.
43+
*/
44+
public Map<String, Object> parse(Aggregations aggregations) {
45+
Map<String, Object> resultMap = new HashMap<>();
46+
for (Aggregation aggregation : aggregations) {
47+
if (metricParserMap.containsKey(aggregation.getName())) {
48+
resultMap.putAll(metricParserMap.get(aggregation.getName()).parse(aggregation));
49+
} else {
50+
throw new RuntimeException(StringUtils.format("couldn't parse field %s in aggregation "
51+
+ "response", aggregation.getName()));
52+
}
53+
}
54+
return resultMap;
55+
}
56+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License").
3+
* You may not use this file except in compliance with the License.
4+
* A copy of the License is located at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* or in the "license" file accompanying this file. This file is distributed
9+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
10+
* express or implied. See the License for the specific language governing
11+
* permissions and limitations under the License.
12+
*/
13+
14+
package org.opensearch.sql.opensearch.response.agg;
15+
16+
import java.util.Arrays;
17+
import java.util.Collections;
18+
import java.util.List;
19+
import java.util.Map;
20+
import org.opensearch.search.aggregations.Aggregations;
21+
22+
/**
23+
* No Bucket Aggregation Parser which include only metric parsers.
24+
*/
25+
public class NoBucketAggregationParser implements OpenSearchAggregationResponseParser {
26+
27+
private final MetricParserHelper metricsParser;
28+
29+
public NoBucketAggregationParser(MetricParser... metricParserList) {
30+
metricsParser = new MetricParserHelper(Arrays.asList(metricParserList));
31+
}
32+
33+
public NoBucketAggregationParser(List<MetricParser> metricParserList) {
34+
metricsParser = new MetricParserHelper(metricParserList);
35+
}
36+
37+
@Override
38+
public List<Map<String, Object>> parse(Aggregations aggregations) {
39+
return Collections.singletonList(metricsParser.parse(aggregations));
40+
}
41+
}

0 commit comments

Comments
 (0)