Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;

import java.io.IOException;
import java.util.List;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;
import org.opensearch.client.ResponseException;
import org.opensearch.sql.ppl.PPLIntegTestCase;

/**
* Integration tests for aggregation functions (MIN, MAX, FIRST, LAST, TAKE) with alias fields.
* Tests the fix for issue #4595.
*/
public class CalciteAliasFieldAggregationIT extends PPLIntegTestCase {

private static final String TEST_INDEX_ALIAS = "test_alias_bug";

@Override
public void init() throws Exception {
super.init();
enableCalcite();
createTestIndexWithAliasFields();
}

/**
* Create test index with alias fields mapping and insert sample data. This mirrors the
* reproduction steps from issue #4595.
*/
private void createTestIndexWithAliasFields() throws IOException {
// Delete the index if it exists (for test isolation)
try {
Request deleteIndex = new Request("DELETE", "/" + TEST_INDEX_ALIAS);
client().performRequest(deleteIndex);
} catch (ResponseException e) {
// Index doesn't exist, which is fine
}

// Create index with alias fields
Request createIndex = new Request("PUT", "/" + TEST_INDEX_ALIAS);
createIndex.setJsonEntity(
"{\n"
+ " \"mappings\": {\n"
+ " \"properties\": {\n"
+ " \"created_at\": {\"type\": \"date\"},\n"
+ " \"@timestamp\": {\"type\": \"alias\", \"path\": \"created_at\"},\n"
+ " \"value\": {\"type\": \"integer\"},\n"
+ " \"value_alias\": {\"type\": \"alias\", \"path\": \"value\"}\n"
+ " }\n"
+ " }\n"
+ "}");
client().performRequest(createIndex);

// Insert test documents
Request bulkRequest = new Request("POST", "/" + TEST_INDEX_ALIAS + "/_bulk?refresh=true");
bulkRequest.setJsonEntity(
"{\"index\":{}}\n"
+ "{\"created_at\": \"2024-01-01T10:00:00Z\", \"value\": 100}\n"
+ "{\"index\":{}}\n"
+ "{\"created_at\": \"2024-01-02T10:00:00Z\", \"value\": 200}\n"
+ "{\"index\":{}}\n"
+ "{\"created_at\": \"2024-01-03T10:00:00Z\", \"value\": 300}\n");
client().performRequest(bulkRequest);
}

@Test
public void testMinWithDateAliasField() throws IOException {
JSONObject actual =
executeQuery(String.format("source=%s | stats MIN(@timestamp)", TEST_INDEX_ALIAS));
verifySchema(actual, schema("MIN(@timestamp)", "timestamp"));
verifyDataRows(actual, rows("2024-01-01 10:00:00"));
}

@Test
public void testMaxWithDateAliasField() throws IOException {
JSONObject actual =
executeQuery(String.format("source=%s | stats MAX(@timestamp)", TEST_INDEX_ALIAS));
verifySchema(actual, schema("MAX(@timestamp)", "timestamp"));
verifyDataRows(actual, rows("2024-01-03 10:00:00"));
}

@Test
public void testMinMaxWithNumericAliasField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats MIN(value_alias), MAX(value_alias)", TEST_INDEX_ALIAS));
verifySchemaInOrder(
actual, schema("MIN(value_alias)", "int"), schema("MAX(value_alias)", "int"));
verifyDataRows(actual, rows(100, 300));
}

@Test
public void testFirstWithAliasField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | sort @timestamp | stats FIRST(@timestamp)", TEST_INDEX_ALIAS));
verifySchema(actual, schema("FIRST(@timestamp)", "timestamp"));
verifyDataRows(actual, rows("2024-01-01 10:00:00"));
}

@Test
public void testLastWithAliasField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | sort @timestamp | stats LAST(@timestamp)", TEST_INDEX_ALIAS));
verifySchema(actual, schema("LAST(@timestamp)", "timestamp"));
verifyDataRows(actual, rows("2024-01-03 10:00:00"));
}

@Test
public void testTakeWithAliasField() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | sort @timestamp | stats TAKE(@timestamp, 2)", TEST_INDEX_ALIAS));
verifySchema(actual, schema("TAKE(@timestamp, 2)", "array"));
verifyDataRows(actual, rows(List.of("2024-01-01T10:00:00.000Z", "2024-01-02T10:00:00.000Z")));
}

@Test
public void testAggregationsWithOriginalFieldsStillWork() throws IOException {
JSONObject actual =
executeQuery(
String.format("source=%s | stats MIN(created_at), MAX(value)", TEST_INDEX_ALIAS));
verifySchemaInOrder(
actual, schema("MIN(created_at)", "timestamp"), schema("MAX(value)", "int"));
verifyDataRows(actual, rows("2024-01-01 10:00:00", 300));
}

@Test
public void testUnaffectedAggregationsWithAliasFields() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | stats SUM(value_alias), AVG(value_alias), COUNT(value_alias)",
TEST_INDEX_ALIAS));
verifySchemaInOrder(
actual,
schema("SUM(value_alias)", "bigint"),
schema("AVG(value_alias)", "double"),
schema("COUNT(value_alias)", "bigint"));
verifyDataRows(actual, rows(600, 200.0, 3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,10 +641,10 @@ public void testExplainRegexMatchInEvalWithOutScriptPushdown() throws IOExceptio
// Only for Calcite
@Test
public void testExplainOnEarliestLatest() throws IOException {
String expected = loadExpectedPlan("explain_earliest_latest.json");
assertJsonEqualsIgnoreId(
String expected = loadExpectedPlan("explain_earliest_latest.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryToString(
explainQueryYaml(
String.format(
"source=%s | stats earliest(message) as earliest_message, latest(message) as"
+ " latest_message by server",
Expand All @@ -654,10 +654,10 @@ public void testExplainOnEarliestLatest() throws IOException {
// Only for Calcite
@Test
public void testExplainOnEarliestLatestWithCustomTimeField() throws IOException {
String expected = loadExpectedPlan("explain_earliest_latest_custom_time.json");
assertJsonEqualsIgnoreId(
String expected = loadExpectedPlan("explain_earliest_latest_custom_time.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryToString(
explainQueryYaml(
String.format(
"source=%s | stats earliest(message, created_at) as earliest_message,"
+ " latest(message, created_at) as latest_message by level",
Expand All @@ -667,10 +667,10 @@ public void testExplainOnEarliestLatestWithCustomTimeField() throws IOException
// Only for Calcite
@Test
public void testExplainOnFirstLast() throws IOException {
String expected = loadExpectedPlan("explain_first_last.json");
assertJsonEqualsIgnoreId(
String expected = loadExpectedPlan("explain_first_last.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryToString(
explainQueryYaml(
String.format(
"source=%s | stats first(firstname) as first_name, last(firstname) as"
+ " last_name by gender",
Expand Down Expand Up @@ -866,18 +866,18 @@ public void testPushdownLimitIntoAggregation() throws IOException {

@Test
public void testExplainMaxOnStringField() throws IOException {
String expected = loadExpectedPlan("explain_max_string_field.json");
assertJsonEqualsIgnoreId(
String expected = loadExpectedPlan("explain_max_string_field.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryToString("source=opensearch-sql_test_index_account | stats max(firstname)"));
explainQueryYaml("source=opensearch-sql_test_index_account | stats max(firstname)"));
}

@Test
public void testExplainMinOnStringField() throws IOException {
String expected = loadExpectedPlan("explain_min_string_field.json");
assertJsonEqualsIgnoreId(
String expected = loadExpectedPlan("explain_min_string_field.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryToString("source=opensearch-sql_test_index_account | stats min(firstname)"));
explainQueryYaml("source=opensearch-sql_test_index_account | stats min(firstname)"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,10 +608,10 @@ public void testDifferentFilterScriptPushDownBehaviorExplain() throws Exception

@Test
public void testExplainOnTake() throws IOException {
String expected = loadExpectedPlan("explain_take.json");
assertJsonEqualsIgnoreId(
String expected = loadExpectedPlan("explain_take.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryToString(
explainQueryYaml(
"source=opensearch-sql_test_index_account | stats take(firstname, 2) as take"));
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(earliest_message=[$1], latest_message=[$2], server=[$0])
LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])
LogicalProject(server=[$1], message=[$3], @timestamp=[$2])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},earliest_message=ARG_MIN($1, $2),latest_message=ARG_MAX($1, $2)), PROJECT->[earliest_message, latest_message, server], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"server":{"terms":{"field":"server","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"earliest_message":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"message.keyword"}],"sort":[{"@timestamp":{"order":"asc"}}]}},"latest_message":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"message.keyword"}],"sort":[{"@timestamp":{"order":"desc"}}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(earliest_message=[$1], latest_message=[$2], level=[$0])
LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])
LogicalProject(level=[$4], message=[$3], created_at=[$0])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},earliest_message=ARG_MIN($1, $2),latest_message=ARG_MAX($1, $2)), PROJECT->[earliest_message, latest_message, level], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"level":{"terms":{"field":"level","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"earliest_message":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"message.keyword"}],"sort":[{"created_at":{"order":"asc"}}]}},"latest_message":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"message.keyword"}],"sort":[{"created_at":{"order":"desc"}}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(first_name=[$1], last_name=[$2], gender=[$0])
LogicalAggregate(group=[{0}], first_name=[FIRST($1)], last_name=[LAST($1)])
LogicalProject(gender=[$4], firstname=[$1])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])
physical: |
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},first_name=FIRST($1),last_name=LAST($1)), PROJECT->[first_name, last_name, gender], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"first_name":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"firstname"}]}},"last_name":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"firstname"}],"sort":[{"_doc":{"order":"desc"}}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

This file was deleted.

Loading
Loading