diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/big5/CalcitePPLBig5IT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/big5/CalcitePPLBig5IT.java index 665b3f0a874..f98b51d1c03 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/big5/CalcitePPLBig5IT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/big5/CalcitePPLBig5IT.java @@ -5,6 +5,8 @@ package org.opensearch.sql.calcite.big5; +import static org.opensearch.sql.util.MatcherUtils.assertYamlEqualsIgnoreId; + import java.io.IOException; import org.junit.FixMethodOrder; import org.junit.Test; @@ -42,4 +44,13 @@ public void coalesce_nonexistent_field_fallback() throws IOException { String ppl = sanitize(loadExpectedQuery("coalesce_nonexistent_field_fallback.ppl")); timing(summary, "coalesce_nonexistent_field_fallback", ppl); } + + /** Tests deduplication by metrics.size field with sorting by timestamp. */ + @Test + public void dedup_metrics_size_field() throws IOException { + String ppl = sanitize(loadExpectedQuery("dedup_metrics_size_field.ppl")); + timing(summary, "dedup_metrics_size_field", ppl); + String expected = loadExpectedPlan("big5/dedup_metrics_size_field.yaml"); + assertYamlEqualsIgnoreId(expected, explainQueryYaml(ppl)); + } } diff --git a/integ-test/src/test/resources/big5/queries/dedup_metrics_size_field.ppl b/integ-test/src/test/resources/big5/queries/dedup_metrics_size_field.ppl new file mode 100644 index 00000000000..bb55d3a06ed --- /dev/null +++ b/integ-test/src/test/resources/big5/queries/dedup_metrics_size_field.ppl @@ -0,0 +1,22 @@ +/* +{ + "name": "dedup_metrics_size_field", + "operation-type": "search", + "index": "{{index_name | default('big5')}}", + "body": { + "query": { + "exists": { + "field": "metrics.size", + "boost": 1.0 + } + }, + "_source": { + "includes": ["agent", "process", "log", "message", "tags", "cloud", "input", "@timestamp", "ecs", "data_stream", "meta", "host", "metrics", "metrics.size", "aws", "event"], + "excludes": [] + } + } +} +*/ +source = big5 +| dedup metrics.size +| sort - @timestamp \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/big5/dedup_metrics_size_field.yaml b/integ-test/src/test/resources/expectedOutput/calcite/big5/dedup_metrics_size_field.yaml new file mode 100644 index 00000000000..9ab1bb342c9 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/big5/dedup_metrics_size_field.yaml @@ -0,0 +1,14 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$7], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(agent=[$0], process=[$6], log=[$8], message=[$11], tags=[$12], cloud=[$13], input=[$15], @timestamp=[$17], ecs=[$18], data_stream=[$20], meta=[$24], host=[$26], metrics=[$27], aws=[$30], event=[$35]) + LogicalSort(sort0=[$17], dir0=[DESC-nulls-last]) + LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], ecs=[$18], ecs.version=[$19], data_stream=[$20], data_stream.dataset=[$21], data_stream.namespace=[$22], data_stream.type=[$23], meta=[$24], meta.file=[$25], host=[$26], metrics=[$27], metrics.size=[$28], metrics.tmin=[$29], aws=[$30], aws.cloudwatch=[$31], aws.cloudwatch.ingestion_time=[$32], aws.cloudwatch.log_group=[$33], aws.cloudwatch.log_stream=[$34], event=[$35], event.dataset=[$36], event.id=[$37], event.ingested=[$38], _id=[$39], _index=[$40], _score=[$41], _maxscore=[$42], _sort=[$43], _routing=[$44]) + LogicalFilter(condition=[<=($45, 1)]) + LogicalProject(agent=[$0], agent.ephemeral_id=[$1], agent.id=[$2], agent.name=[$3], agent.type=[$4], agent.version=[$5], process=[$6], process.name=[$7], log=[$8], log.file=[$9], log.file.path=[$10], message=[$11], tags=[$12], cloud=[$13], cloud.region=[$14], input=[$15], input.type=[$16], @timestamp=[$17], ecs=[$18], ecs.version=[$19], data_stream=[$20], data_stream.dataset=[$21], data_stream.namespace=[$22], data_stream.type=[$23], meta=[$24], meta.file=[$25], host=[$26], metrics=[$27], metrics.size=[$28], metrics.tmin=[$29], aws=[$30], aws.cloudwatch=[$31], aws.cloudwatch.ingestion_time=[$32], aws.cloudwatch.log_group=[$33], aws.cloudwatch.log_stream=[$34], event=[$35], event.dataset=[$36], event.id=[$37], event.ingested=[$38], _id=[$39], _index=[$40], _score=[$41], _maxscore=[$42], _sort=[$43], _routing=[$44], _row_number_dedup_=[ROW_NUMBER() OVER (PARTITION BY $28)]) + LogicalFilter(condition=[IS NOT NULL($28)]) + CalciteLogicalIndexScan(table=[[OpenSearch, big5]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableSort(sort0=[$7], dir0=[DESC-nulls-last]) + CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"metrics.size":{"terms":{"field":"metrics.size","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"_source":{"includes":["metrics.size","agent","agent.ephemeral_id","agent.id","agent.name","agent.type","agent.version","process","process.name","log","log.file","log.file.path","message","tags","cloud","cloud.region","input","input.type","@timestamp","ecs","ecs.version","data_stream","data_stream.dataset","data_stream.namespace","data_stream.type","meta","meta.file","host","metrics","metrics.tmin","aws","aws.cloudwatch","aws.cloudwatch.ingestion_time","aws.cloudwatch.log_group","aws.cloudwatch.log_stream","event","event.dataset","event.id","event.ingested","_id","_index","_score","_maxscore","_sort","_routing"],"excludes":[]},"script_fields":{}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/ObjectContent.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/ObjectContent.java index 9ca3deadddd..b409640c993 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/ObjectContent.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/utils/ObjectContent.java @@ -141,7 +141,7 @@ public boolean isBoolean() { @Override public boolean isArray() { - return value instanceof ArrayNode; + return value instanceof ArrayNode || value instanceof List; } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java index 4e6032d4f1d..f31c4b67ae7 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/data/value/OpenSearchExprValueFactory.java @@ -182,8 +182,7 @@ public ExprValue construct(String jsonString, boolean supportArrays) { * @return ExprValue */ public ExprValue construct(String field, Object value, boolean supportArrays) { - Object extractedValue = extractFinalPrimitiveValue(value); - return parse(new ObjectContent(extractedValue), field, type(field), supportArrays); + return parse(new ObjectContent(value), field, type(field), supportArrays); } private ExprValue parse( @@ -214,7 +213,9 @@ private ExprValue parse( || type == STRUCT) { return parseStruct(content, field, supportArrays); } else if (typeActionMap.containsKey(type)) { - return typeActionMap.get(type).apply(content, type); + return content.isArray() + ? parseArray(content, field, type, supportArrays) + : typeActionMap.get(type).apply(content, type); } else { throw new IllegalStateException( String.format( @@ -581,26 +582,4 @@ private ExprValue parseInnerArrayValue( private String makeField(String path, String field) { return path.equalsIgnoreCase(TOP_PATH) ? field : String.join(".", path, field); } - - /** - * Recursively extracts the final primitive value from nested Map structures. For example: - * {attributes={telemetry={sdk={language=java}}}} -> "java" - * - * @param value The value to extract from - * @return The extracted primitive value, or the original value if extraction is not possible - */ - @SuppressWarnings("unchecked") - private Object extractFinalPrimitiveValue(Object value) { - if (value == null || !(value instanceof Map)) { - return value; - } - - Map map = (Map) value; - if (map.size() == 1) { - Object singleValue = map.values().iterator().next(); - return extractFinalPrimitiveValue(singleValue); - } - - return value; - } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java index 28e4bc16001..5b64d8fc197 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java @@ -93,7 +93,7 @@ protected void apply( .filter(pair -> ((RexInputRef) pair.getKey()).getIndex() == i) .map(Pair::getValue) .findFirst() - .get()) + .orElse(projectWithWindow.getInput().getRowType().getFieldNames().get(i))) .collect(Collectors.toList()); if (dedupColumnIndices.size() != dedupColumnNames.size()) { return;