diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index e3f5a32d3e6..db7919e4de2 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -344,8 +344,8 @@ public enum BuiltinFunctionName { .put("take", BuiltinFunctionName.TAKE) .put("percentile", BuiltinFunctionName.PERCENTILE_APPROX) .put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX) - // .put("earliest", BuiltinFunctionName.EARLIEST) - // .put("latest", BuiltinFunctionName.LATEST) + .put("earliest", BuiltinFunctionName.EARLIEST) + .put("latest", BuiltinFunctionName.LATEST) .put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX) .put("pattern", BuiltinFunctionName.INTERNAL_PATTERN) .build(); diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java index 6d48a0dec43..2f3c2759621 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java @@ -827,7 +827,8 @@ void populate() { SqlStdOperatorTable.PLUS, PPLTypeChecker.family(SqlTypeFamily.NUMERIC, SqlTypeFamily.NUMERIC)); // Replace with a custom CompositeOperandTypeChecker to check both operands as - // SqlStdOperatorTable.ITEM.getOperandTypeChecker() checks only the first operand instead + // SqlStdOperatorTable.ITEM.getOperandTypeChecker() checks only the first + // operand instead // of all operands. registerOperator( INTERNAL_ITEM, @@ -841,14 +842,18 @@ void populate() { XOR, SqlStdOperatorTable.NOT_EQUALS, PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.BOOLEAN)); - // SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a type checker - // for it. The second and third operands are required to be of the same type. If not, - // it will throw an IllegalArgumentException with information Can't find leastRestrictive type + // SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a + // type checker + // for it. The second and third operands are required to be of the same type. If + // not, + // it will throw an IllegalArgumentException with information Can't find + // leastRestrictive type registerOperator( IF, SqlStdOperatorTable.CASE, PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.ANY, SqlTypeFamily.ANY)); - // Re-define the type checker for is not null, is present, and is null since their original + // Re-define the type checker for is not null, is present, and is null since + // their original // type checker ANY isn't compatible with struct types. registerOperator( IS_NOT_NULL, @@ -901,7 +906,8 @@ void populate() { (FunctionImp2) (builder, arg1, arg2) -> builder.makeCall(SqlLibraryOperators.STRCMP, arg2, arg1), PPLTypeChecker.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)); - // SqlStdOperatorTable.SUBSTRING.getOperandTypeChecker is null. We manually create a type + // SqlStdOperatorTable.SUBSTRING.getOperandTypeChecker is null. We manually + // create a type // checker for it. register( SUBSTRING, @@ -1051,6 +1057,21 @@ void registerOperator(BuiltinFunctionName functionName, SqlAggFunction aggFuncti register(functionName, handler, typeChecker); } + private static RexNode resolveTimeField(List argList, CalcitePlanContext ctx) { + if (argList.isEmpty()) { + // Try to find @timestamp field + var timestampField = + ctx.relBuilder.peek().getRowType().getField("@timestamp", false, false); + if (timestampField == null) { + throw new IllegalArgumentException( + "Default @timestamp field not found. Please specify a time field explicitly."); + } + return ctx.rexBuilder.makeInputRef(timestampField.getType(), timestampField.getIndex()); + } else { + return PlanUtils.derefMapCall(argList.get(0)); + } + } + void populate() { registerOperator(MAX, SqlStdOperatorTable.MAX); registerOperator(MIN, SqlStdOperatorTable.MIN); @@ -1089,6 +1110,24 @@ void populate() { extractTypeCheckerFromUDF(PPLBuiltinOperators.PERCENTILE_APPROX), PERCENTILE_APPROX.name(), false)); + + register( + EARLIEST, + (distinct, field, argList, ctx) -> { + RexNode timeField = resolveTimeField(argList, ctx); + return ctx.relBuilder.aggregateCall(SqlStdOperatorTable.ARG_MIN, field, timeField); + }, + wrapSqlOperandTypeChecker( + SqlStdOperatorTable.ARG_MIN.getOperandTypeChecker(), EARLIEST.name(), false)); + + register( + LATEST, + (distinct, field, argList, ctx) -> { + RexNode timeField = resolveTimeField(argList, ctx); + return ctx.relBuilder.aggregateCall(SqlStdOperatorTable.ARG_MAX, field, timeField); + }, + wrapSqlOperandTypeChecker( + SqlStdOperatorTable.ARG_MAX.getOperandTypeChecker(), LATEST.name(), false)); } } diff --git a/core/src/test/java/org/opensearch/sql/expression/function/AggFunctionTestBase.java b/core/src/test/java/org/opensearch/sql/expression/function/AggFunctionTestBase.java new file mode 100644 index 00000000000..d20841a2cee --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/expression/function/AggFunctionTestBase.java @@ -0,0 +1,114 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.Set; +import org.opensearch.sql.expression.function.PPLFuncImpTable.AggHandler; + +public abstract class AggFunctionTestBase { + + @SuppressWarnings("unchecked") + protected Map> + getAggFunctionRegistry() { + try { + PPLFuncImpTable funcTable = PPLFuncImpTable.INSTANCE; + Field field = PPLFuncImpTable.class.getDeclaredField("aggFunctionRegistry"); + field.setAccessible(true); + return (Map>) + field.get(funcTable); + } catch (Exception e) { + throw new RuntimeException("Failed to access aggFunctionRegistry", e); + } + } + + protected void assertFunctionIsRegistered(BuiltinFunctionName functionName) { + Map> registry = + getAggFunctionRegistry(); + assertTrue( + registry.containsKey(functionName), + functionName.getName().getFunctionName() + + " function should be registered in aggregate function registry"); + assertNotNull( + registry.get(functionName), + functionName.getName().getFunctionName() + " function handler should not be null"); + } + + protected void assertFunctionsAreRegistered(BuiltinFunctionName... functionNames) { + for (BuiltinFunctionName functionName : functionNames) { + assertFunctionIsRegistered(functionName); + } + } + + protected void assertFunctionHandlerTypes(BuiltinFunctionName... functionNames) { + Map> registry = + getAggFunctionRegistry(); + for (BuiltinFunctionName functionName : functionNames) { + org.apache.commons.lang3.tuple.Pair registryEntry = registry.get(functionName); + assertNotNull( + registryEntry, functionName.getName().getFunctionName() + " should be registered"); + + // Extract the AggHandler from the pair + AggHandler handler = registryEntry.getRight(); + + assertNotNull( + handler, functionName.getName().getFunctionName() + " handler should not be null"); + assertTrue( + handler instanceof AggHandler, + functionName.getName().getFunctionName() + + " handler should implement AggHandler interface"); + } + } + + protected void assertRegistryMinimumSize(int expectedMinimumSize) { + Map> registry = + getAggFunctionRegistry(); + assertTrue( + registry.size() >= expectedMinimumSize, + "Registry should contain at least " + expectedMinimumSize + " aggregate functions"); + } + + protected void assertKnownFunctionsPresent(Set knownFunctions) { + Map> registry = + getAggFunctionRegistry(); + long foundFunctions = registry.keySet().stream().filter(knownFunctions::contains).count(); + + assertTrue( + foundFunctions >= knownFunctions.size(), + "Should have at least " + knownFunctions.size() + " known aggregate functions registered"); + } + + protected void assertFunctionNameResolution( + String functionName, BuiltinFunctionName expectedEnum) { + assertTrue( + BuiltinFunctionName.of(functionName).isPresent(), + "Should be able to resolve '" + functionName + "' function name"); + assertTrue( + BuiltinFunctionName.of(functionName).get() == expectedEnum, + "Resolved function should match expected enum value"); + } + + protected void assertFunctionNamesInEnum(BuiltinFunctionName... functionNames) { + Set enumValues = Set.of(BuiltinFunctionName.values()); + + for (BuiltinFunctionName functionName : functionNames) { + assertTrue( + enumValues.contains(functionName), + functionName.getName().getFunctionName() + + " should be defined in BuiltinFunctionName enum"); + } + } + + protected void assertFunctionNameMapping(BuiltinFunctionName functionEnum, String expectedName) { + assertTrue( + functionEnum.getName().getFunctionName().equals(expectedName), + "Function enum should map to expected name: " + expectedName); + } +} diff --git a/core/src/test/java/org/opensearch/sql/expression/function/EarliestLatestAggFunctionTest.java b/core/src/test/java/org/opensearch/sql/expression/function/EarliestLatestAggFunctionTest.java new file mode 100644 index 00000000000..3e1ca244a65 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/expression/function/EarliestLatestAggFunctionTest.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.expression.function; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Set; +import org.junit.jupiter.api.Test; + +public class EarliestLatestAggFunctionTest extends AggFunctionTestBase { + + @Test + void testEarliestFunctionIsRegistered() { + assertFunctionIsRegistered(BuiltinFunctionName.EARLIEST); + } + + @Test + void testLatestFunctionIsRegistered() { + assertFunctionIsRegistered(BuiltinFunctionName.LATEST); + } + + @Test + void testBuiltinFunctionNameMapping() { + assertFunctionNameMapping(BuiltinFunctionName.EARLIEST, "earliest"); + assertFunctionNameMapping(BuiltinFunctionName.LATEST, "latest"); + } + + @Test + void testFunctionNameResolution() { + assertFunctionNameResolution("earliest", BuiltinFunctionName.EARLIEST); + assertFunctionNameResolution("latest", BuiltinFunctionName.LATEST); + } + + @Test + void testResolveAggWithValidFunctions() { + try { + java.lang.reflect.Method method = + PPLFuncImpTable.class.getDeclaredMethod( + "resolveAgg", + BuiltinFunctionName.class, + boolean.class, + org.apache.calcite.rex.RexNode.class, + java.util.List.class, + org.opensearch.sql.calcite.CalcitePlanContext.class); + + assertNotNull(method, "resolveAgg method should exist"); + } catch (NoSuchMethodException e) { + throw new RuntimeException("resolveAgg method not found", e); + } + } + + @Test + void testFunctionRegistryIntegrity() { + assertFunctionsAreRegistered(BuiltinFunctionName.EARLIEST, BuiltinFunctionName.LATEST); + } + + @Test + void testFunctionHandlerTypes() { + assertFunctionHandlerTypes(BuiltinFunctionName.EARLIEST, BuiltinFunctionName.LATEST); + } + + @Test + void testFunctionRegistrySize() { + assertRegistryMinimumSize(10); + + Set knownFunctions = + Set.of( + BuiltinFunctionName.COUNT, + BuiltinFunctionName.SUM, + BuiltinFunctionName.AVG, + BuiltinFunctionName.MAX, + BuiltinFunctionName.MIN, + BuiltinFunctionName.EARLIEST, + BuiltinFunctionName.LATEST); + + assertKnownFunctionsPresent(knownFunctions); + } + + @Test + void testEarliestLatestFunctionNamesInEnum() { + assertFunctionNamesInEnum(BuiltinFunctionName.EARLIEST, BuiltinFunctionName.LATEST); + } +} diff --git a/docs/category.json b/docs/category.json index 4645b6c8318..47098ffc032 100644 --- a/docs/category.json +++ b/docs/category.json @@ -24,7 +24,6 @@ "user/ppl/cmd/rename.rst", "user/ppl/cmd/search.rst", "user/ppl/cmd/sort.rst", - "user/ppl/cmd/stats.rst", "user/ppl/cmd/syntax.rst", "user/ppl/cmd/trendline.rst", "user/ppl/cmd/top.rst", @@ -56,5 +55,8 @@ "user/dql/aggregations.rst", "user/dql/complex.rst", "user/dql/metadata.rst" + ], + "ppl_cli_calcite": [ + "user/ppl/cmd/stats.rst" ] } diff --git a/docs/user/dql/metadata.rst b/docs/user/dql/metadata.rst index 19696a0a3fc..9e55ab9885b 100644 --- a/docs/user/dql/metadata.rst +++ b/docs/user/dql/metadata.rst @@ -35,7 +35,7 @@ Example 1: Show All Indices Information SQL query:: os> SHOW TABLES LIKE '%' - fetched rows / total rows = 15/15 + fetched rows / total rows = 16/16 +----------------+-------------+------------------+------------+---------+----------+------------+-----------+---------------------------+----------------+ | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | |----------------+-------------+------------------+------------+---------+----------+------------+-----------+---------------------------+----------------| @@ -44,6 +44,7 @@ SQL query:: | docTestCluster | null | accounts | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | apache | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | books | BASE TABLE | null | null | null | null | null | null | + | docTestCluster | null | events | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | json_test | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nested | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nyc_taxi | BASE TABLE | null | null | null | null | null | null | diff --git a/docs/user/ppl/cmd/stats.rst b/docs/user/ppl/cmd/stats.rst index c15eeadd5d9..4988b5f00b2 100644 --- a/docs/user/ppl/cmd/stats.rst +++ b/docs/user/ppl/cmd/stats.rst @@ -71,6 +71,18 @@ stats ... [by-clause] | year (y) | +----------------------------+ +Configuration +============= +Some aggregation functions require Calcite to be enabled for proper functionality. To enable Calcite, use the following command: + +Enable Calcite:: + + >> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings -d '{ + "persistent" : { + "plugins.calcite.enabled" : true + } + }' + Aggregation Functions ===================== @@ -310,6 +322,80 @@ Example:: | 36 | M | +---------------------+--------+ +EARLIEST +-------- + +Description +>>>>>>>>>>> + +Version: 3.3.0 + +Usage: EARLIEST(field [, time_field]). Return the earliest value of a field based on timestamp ordering. + +* field: mandatory. The field to return the earliest value for. +* time_field: optional. The field to use for time-based ordering. Defaults to @timestamp if not specified. + +Note: This function requires Calcite to be enabled (see `Configuration`_ section above). + +Example:: + + os> source=events | stats earliest(message) by host | sort host; + fetched rows / total rows = 2/2 + +-------------------+---------+ + | earliest(message) | host | + |-------------------+---------| + | Starting up | server1 | + | Initializing | server2 | + +-------------------+---------+ + +Example with custom time field:: + + os> source=events | stats earliest(status, event_time) by category | sort category; + fetched rows / total rows = 2/2 + +------------------------------+----------+ + | earliest(status, event_time) | category | + |------------------------------+----------| + | pending | orders | + | active | users | + +------------------------------+----------+ + +LATEST +------ + +Description +>>>>>>>>>>> + +Version: 3.3.0 + +Usage: LATEST(field [, time_field]). Return the latest value of a field based on timestamp ordering. + +* field: mandatory. The field to return the latest value for. +* time_field: optional. The field to use for time-based ordering. Defaults to @timestamp if not specified. + +Note: This function requires Calcite to be enabled (see `Configuration`_ section above). + +Example:: + + os> source=events | stats latest(message) by host | sort host; + fetched rows / total rows = 2/2 + +------------------+---------+ + | latest(message) | host | + |------------------+---------| + | Shutting down | server1 | + | Maintenance mode | server2 | + +------------------+---------+ + +Example with custom time field:: + + os> source=events | stats latest(status, event_time) by category | sort category; + fetched rows / total rows = 2/2 + +----------------------------+----------+ + | latest(status, event_time) | category | + |----------------------------+----------| + | cancelled | orders | + | inactive | users | + +----------------------------+----------+ + Example 1: Calculate the count of events ======================================== @@ -532,4 +618,3 @@ PPL query:: | 28 | 20 | F | | 36 | 30 | M | +-----+----------+--------+ - diff --git a/doctest/build.gradle b/doctest/build.gradle index 4a72ebe4679..92b1a1c9e2c 100644 --- a/doctest/build.gradle +++ b/doctest/build.gradle @@ -81,17 +81,17 @@ task doctest(type: Exec, dependsOn: ['bootstrap']) { def docs = project.findProperty('docs') def verbose = project.findProperty('verbose') def endpoint = project.findProperty('endpoint') + def debug = project.findProperty('debug') + + // Set environment variables for debug mode + if (debug == 'true') { + environment 'DOCTEST_DEBUG', 'true' + } if (docs) { - // Single file or multiple files mode def args = ['.venv/bin/python', 'test_docs.py'] - // Handle multiple files (comma-separated) - if (docs.contains(',')) { - args.addAll(docs.split(',').collect { it.trim() }) - } else { - args.add(docs) - } + args.addAll(docs.split(',').collect { it.trim() }) if (verbose == 'true') { args.add('--verbose') @@ -109,11 +109,6 @@ task doctest(type: Exec, dependsOn: ['bootstrap']) { doLast { // remove the cloned sql-cli folder file("$projectDir/sql-cli").deleteDir() - if (docs) { - println("Single file doctest done") - } else { - println("Full doctest suite done") - } } } diff --git a/doctest/test_data/events.json b/doctest/test_data/events.json new file mode 100644 index 00000000000..e873691fb90 --- /dev/null +++ b/doctest/test_data/events.json @@ -0,0 +1,8 @@ +{"@timestamp":"2023-01-01T10:00:00Z","event_time":"2023-01-01T09:55:00Z","host":"server1","message":"Starting up","level":"INFO","category":"orders","status":"pending"} +{"@timestamp":"2023-01-01T10:05:00Z","event_time":"2023-01-01T10:00:00Z","host":"server2","message":"Initializing","level":"INFO","category":"users","status":"active"} +{"@timestamp":"2023-01-01T10:10:00Z","event_time":"2023-01-01T10:05:00Z","host":"server1","message":"Ready to serve","level":"INFO","category":"orders","status":"processing"} +{"@timestamp":"2023-01-01T10:15:00Z","event_time":"2023-01-01T10:10:00Z","host":"server2","message":"Ready","level":"INFO","category":"users","status":"inactive"} +{"@timestamp":"2023-01-01T10:20:00Z","event_time":"2023-01-01T10:15:00Z","host":"server1","message":"Processing requests","level":"INFO","category":"orders","status":"completed"} +{"@timestamp":"2023-01-01T10:25:00Z","event_time":"2023-01-01T10:20:00Z","host":"server2","message":"Handling connections","level":"INFO","category":"users","status":"pending"} +{"@timestamp":"2023-01-01T10:30:00Z","event_time":"2023-01-01T10:25:00Z","host":"server1","message":"Shutting down","level":"WARN","category":"orders","status":"cancelled"} +{"@timestamp":"2023-01-01T10:35:00Z","event_time":"2023-01-01T10:30:00Z","host":"server2","message":"Maintenance mode","level":"WARN","category":"users","status":"inactive"} diff --git a/doctest/test_docs.py b/doctest/test_docs.py index c1a3e889fb9..d79e5b1cf55 100644 --- a/doctest/test_docs.py +++ b/doctest/test_docs.py @@ -20,55 +20,51 @@ from opensearch_sql_cli.utils import OutputSettings from opensearchpy import OpenSearch, helpers -ENDPOINT = "http://localhost:9200" -ACCOUNTS = "accounts" -EMPLOYEES = "employees" -PEOPLE = "people" -ACCOUNT2 = "account2" -NYC_TAXI = "nyc_taxi" -BOOKS = "books" -APACHE = "apache" -WILDCARD = "wildcard" -NESTED = "nested" -DATASOURCES = ".ql-datasources" -WEBLOGS = "weblogs" -JSON_TEST = "json_test" -STATE_COUNTRY = "state_country" -OCCUPATION = "occupation" -WORKER = "worker" -WORK_INFORMATION = "work_information" - -class DocTestConnection(OpenSearchConnection): - - def __init__(self, query_language="sql"): - super(DocTestConnection, self).__init__(endpoint=ENDPOINT, query_language=query_language) - self.set_connection() - settings = OutputSettings(table_format="psql", is_vertical=False) - self.formatter = Formatter(settings) - - def process(self, statement): - data = self.execute_query(statement, use_console=False) - output = self.formatter.format_output(data) - output = "\n".join(output) - - click.echo(output) +ENDPOINT = "http://localhost:9200" +TEST_DATA = { + 'accounts': 'accounts.json', + 'people': 'people.json', + 'account2': 'account2.json', + 'nyc_taxi': 'nyc_taxi.json', + 'books': 'books.json', + 'apache': 'apache.json', + 'wildcard': 'wildcard.json', + 'nested': 'nested_objects.json', + '.ql-datasources': 'datasources.json', + 'weblogs': 'weblogs.json', + 'json_test': 'json_test.json', + 'state_country': 'state_country.json', + 'occupation': 'occupation.json', + 'worker': 'worker.json', + 'work_information': 'work_information.json', + 'events': 'events.json' +} + +DEBUG_MODE = os.environ.get('DOCTEST_DEBUG', 'false').lower() == 'true' + + +def debug(message): + if DEBUG_MODE: + print(f"[DEBUG] {message}", file=sys.stderr) + + +def detect_doc_type_from_path(file_path): + if '/ppl/' in file_path: + return 'ppl_cli' + elif '/sql/' in file_path or '/dql/' in file_path: + return 'sql_cli' + else: + return 'bash' -""" -For _explain requests, there are several additional request fields that will inconsistently -appear/change depending on underlying cluster state. This method normalizes these responses in-place -to make _explain doctests more consistent. -If the passed response is not an _explain response, the input is left unmodified. -""" def normalize_explain_response(data): if "root" in data: data = data["root"] if (request := data.get("description", {}).get("request", None)) and request.startswith("OpenSearchQueryRequest("): for filter_field in ["needClean", "pitId", "cursorKeepAlive", "searchAfter", "searchResponse"]: - # The value of PIT may contain `+=_-`. request = re.sub(f", {filter_field}=[A-Za-z0-9+=_-]+", "", request) data["description"]["request"] = request @@ -82,15 +78,158 @@ def pretty_print(s): try: data = json.loads(s) normalize_explain_response(data) - print(json.dumps(data, indent=2)) except json.decoder.JSONDecodeError: print(s) -sql_cmd = DocTestConnection(query_language="sql") -ppl_cmd = DocTestConnection(query_language="ppl") -test_data_client = OpenSearch([ENDPOINT], verify_certs=True) +def requires_calcite(doc_category): + return doc_category.endswith('_calcite') + + +class CategoryManager: + + def __init__(self, category_file_path='../docs/category.json'): + self._categories = self.load_categories(category_file_path) + self._all_docs_cache = None + + def load_categories(self, file_path): + try: + with open(file_path) as json_file: + categories = json.load(json_file) + debug(f"Loaded {len(categories)} categories from {file_path}") + return categories + except Exception as e: + raise Exception(f"Failed to load categories from {file_path}: {e}") + + def get_all_categories(self): + return list(self._categories.keys()) + + def get_category_files(self, category_name): + return self._categories.get(category_name, []) + + def get_all_docs(self): + if self._all_docs_cache is None: + self._all_docs_cache = [] + for category_name, docs in self._categories.items(): + self._all_docs_cache.extend(docs) + return self._all_docs_cache + + def find_file_category(self, file_path): + # Convert to relative path from docs root + if file_path.startswith('../docs/'): + rel_path = file_path[8:] # Remove '../docs/' prefix + else: + rel_path = file_path + + for category_name, docs in self._categories.items(): + if rel_path in docs: + debug(f"Found file {rel_path} in category {category_name}") + return category_name + + # Fallback to path-based detection + debug(f"File {rel_path} not found in categories, using path-based detection") + return detect_doc_type_from_path(file_path) + + def requires_calcite(self, category_name): + return category_name.endswith('_calcite') + + def get_setup_function(self, category_name): + if self.requires_calcite(category_name): + return set_up_test_indices_with_calcite + else: + return set_up_test_indices_without_calcite + + def get_parser_for_category(self, category_name): + if category_name.startswith('bash'): + return bash_parser + elif category_name.startswith('ppl_cli'): + return ppl_cli_parser + elif category_name.startswith('sql_cli'): + return sql_cli_parser + else: + # Default fallback + return sql_cli_parser + + def find_matching_files(self, search_filename): + if not search_filename.endswith('.rst'): + search_filename += '.rst' + + all_docs = self.get_all_docs() + matches = [doc for doc in all_docs if doc.endswith(search_filename)] + return matches + + +class DocTestConnection(OpenSearchConnection): + + def __init__(self, query_language="sql", endpoint=ENDPOINT): + super(DocTestConnection, self).__init__(endpoint, query_language=query_language) + self.set_connection() + settings = OutputSettings(table_format="psql", is_vertical=False) + self.formatter = Formatter(settings) + + def process(self, statement): + debug(f"Executing {self.query_language.upper()} query: {statement}") + + data = self.execute_query(statement, use_console=False) + debug(f"Query result: {data}") + + if data is None: + debug("Query returned None - this may indicate an error or unsupported function") + print("Error: Query returned no data") + return + + output = self.formatter.format_output(data) + output = "\n".join(output) + click.echo(output) + + +class CalciteManager: + + @staticmethod + def set_enabled(enabled): + import requests + calcite_settings = { + "transient": { + "plugins.calcite.enabled": enabled + } + } + response = requests.put(f"{ENDPOINT}/_plugins/_query/settings", + json=calcite_settings, + timeout=10) + + if response.status_code != 200: + raise Exception(f"Failed to set Calcite setting: {response.status_code} {response.text}") + +class TestDataManager: + + def __init__(self): + self.client = OpenSearch([ENDPOINT], verify_certs=True) + + def load_file(self, filename, index_name): + mapping_file_path = './test_mapping/' + filename + if os.path.isfile(mapping_file_path): + with open(mapping_file_path, 'r') as f: + self.client.indices.create(index=index_name, body=f.read()) + + data_file_path = './test_data/' + filename + def load_json(): + with open(data_file_path, 'r') as f: + for line in f: + yield json.loads(line) + + helpers.bulk(self.client, load_json(), stats_only=True, index=index_name, refresh='wait_for') + + def load_all_test_data(self): + for index_name, filename in TEST_DATA.items(): + if filename is not None: + self.load_file(filename, index_name) + else: + debug(f"Skipping index '{index_name}' - filename is None") + + def cleanup_indices(self): + indices_to_delete = list(TEST_DATA.keys()) + self.client.indices.delete(index=indices_to_delete, ignore_unavailable=True) def sql_cli_transform(s): @@ -102,7 +241,6 @@ def ppl_cli_transform(s): def bash_transform(s): - # TODO: add ppl support, be default cli uses sql if s.startswith("opensearchsql"): s = re.search(r"opensearchsql\s+-q\s+\"(.*?)\"", s).group(1) return u'cmd.process({0})'.format(repr(s.strip().rstrip(';'))) @@ -119,66 +257,46 @@ def bash_transform(s): ps1=r'sh\$', comment_prefix='#', transform=bash_transform) -def set_up_test_indices(test): - set_up(test) - load_file("accounts.json", index_name=ACCOUNTS) - load_file("people.json", index_name=PEOPLE) - load_file("account2.json", index_name=ACCOUNT2) - load_file("nyc_taxi.json", index_name=NYC_TAXI) - load_file("books.json", index_name=BOOKS) - load_file("apache.json", index_name=APACHE) - load_file("wildcard.json", index_name=WILDCARD) - load_file("nested_objects.json", index_name=NESTED) - load_file("datasources.json", index_name=DATASOURCES) - load_file("weblogs.json", index_name=WEBLOGS) - load_file("json_test.json", index_name=JSON_TEST) - load_file("state_country.json", index_name=STATE_COUNTRY) - load_file("occupation.json", index_name=OCCUPATION) - load_file("worker.json", index_name=WORKER) - load_file("work_information.json", index_name=WORK_INFORMATION) - - -def load_file(filename, index_name): - # Create index with the mapping if mapping file exists - mapping_file_path = './test_mapping/' + filename - if os.path.isfile(mapping_file_path): - with open(mapping_file_path, 'r') as f: - test_data_client.indices.create(index=index_name, body=f.read()) - - # generate iterable data - data_file_path = './test_data/' + filename - def load_json(): - with open(data_file_path, 'r') as f: - for line in f: - yield json.loads(line) - - # Need to enable refresh, because the load won't be visible to search immediately - # https://stackoverflow.com/questions/57840161/elasticsearch-python-bulk-helper-api-with-refresh - helpers.bulk(test_data_client, load_json(), stats_only=True, index=index_name, refresh='wait_for') - - -def set_up(test): - test.globs['sql_cmd'] = sql_cmd - test.globs['ppl_cmd'] = ppl_cmd +test_data_manager = None + + +def get_test_data_manager(): + global test_data_manager + if test_data_manager is None: + test_data_manager = TestDataManager() + return test_data_manager + + +def set_up_test_indices_with_calcite(test): + test.globs['sql_cmd'] = DocTestConnection(query_language="sql") + test.globs['ppl_cmd'] = DocTestConnection(query_language="ppl") + CalciteManager.set_enabled(True) + get_test_data_manager().load_all_test_data() + + +def set_up_test_indices_without_calcite(test): + test.globs['sql_cmd'] = DocTestConnection(query_language="sql") + test.globs['ppl_cmd'] = DocTestConnection(query_language="ppl") + CalciteManager.set_enabled(False) + get_test_data_manager().load_all_test_data() def tear_down(test): - # drop leftover tables after each test - test_data_client.indices.delete(index=[ACCOUNTS, EMPLOYEES, PEOPLE, ACCOUNT2, NYC_TAXI, BOOKS, APACHE, WILDCARD, NESTED, WEBLOGS, JSON_TEST, STATE_COUNTRY, OCCUPATION, WORKER, WORK_INFORMATION], ignore_unavailable=True) + get_test_data_manager().cleanup_indices() docsuite = partial(doctest.DocFileSuite, tearDown=tear_down, - parser=sql_cli_parser, optionflags=doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS, encoding='utf-8') -doctest_file = partial(os.path.join, '../docs') +def get_doc_filepath(item): + return os.path.join('../docs', item) -def doctest_files(items): - return (doctest_file(item) for item in items) +def get_doc_filepaths(items): + return (get_doc_filepath(item) for item in items) class DocTests(unittest.TestSuite): @@ -186,11 +304,11 @@ def run(self, result, debug=False): super().run(result, debug) -def doc_suite(fn): +def create_bash_suite(filepaths, setup_func): return docsuite( - fn, + *filepaths, parser=bash_parser, - setUp=set_up_test_indices, + setUp=setup_func, globs={ 'sh': partial( subprocess.run, @@ -205,257 +323,65 @@ def doc_suite(fn): ) +def create_cli_suite(filepaths, parser, setup_func): + return docsuite( + *filepaths, + parser=parser, + setUp=setup_func + ) + +# Entry point for unittest discovery def load_tests(loader, suite, ignore): tests = [] - # Load doctest docs by category - with open('../docs/category.json') as json_file: - category = json.load(json_file) - - bash_docs = category['bash'] - ppl_cli_docs = category['ppl_cli'] - sql_cli_docs = category['sql_cli'] - - # docs with bash-based examples - for fn in doctest_files(bash_docs): - tests.append(doc_suite(fn)) - - # docs with sql-cli based examples - # TODO: add until the migration to new architecture is done, then we have an artifact including ppl and sql both - # for fn in doctest_files('sql/basics.rst'): - # tests.append(docsuite(fn, setUp=set_up_accounts)) - for fn in doctest_files(sql_cli_docs): - tests.append( - docsuite( - fn, - parser=sql_cli_parser, - setUp=set_up_test_indices - ) - ) - - # docs with ppl-cli based examples - for fn in doctest_files(ppl_cli_docs): - tests.append( - docsuite( - fn, - parser=ppl_cli_parser, - setUp=set_up_test_indices - ) - ) - - # randomize order of tests to make sure they don't depend on each other - random.shuffle(tests) - - return DocTests(tests) - - -# Single file doctest functionality -def find_doc_file(filename_or_path): - """Find documentation file by name or return the path if it's already a full path""" - # If it's already a full path that exists, return it - if os.path.exists(filename_or_path): - return filename_or_path - - # If it's just a filename, search for it in the docs directory - if not os.path.sep in filename_or_path: - try: - with open('../docs/category.json') as json_file: - category = json.load(json_file) - - # Search in all categories - all_docs = category['bash'] + category['ppl_cli'] + category['sql_cli'] - - # Add .rst extension if not present - search_filename = filename_or_path - if not search_filename.endswith('.rst'): - search_filename += '.rst' - - # Find files that end with the given filename - matches = [doc for doc in all_docs if doc.endswith(search_filename)] - - if len(matches) == 1: - found_path = f"../docs/{matches[0]}" - print(f"Found: {found_path}") - return found_path - elif len(matches) > 1: - print(f"Multiple files found matching '{search_filename}':") - for match in matches: - print(f" ../docs/{match}") - print("Please specify the full path or a more specific filename.") - return None - else: - print(f"No documentation file found matching '{search_filename}'") - print("Use --list to see all available files") - return None - - except Exception as e: - print(f"Error searching for file: {e}") - return None + category_manager = CategoryManager() - # If it's a relative path, try to find it - if not filename_or_path.startswith('../docs/'): - potential_path = f"../docs/{filename_or_path}" - if os.path.exists(potential_path): - return potential_path - - return filename_or_path + for category_name in category_manager.get_all_categories(): + docs = category_manager.get_category_files(category_name) + if not docs: + continue + tests.append(get_test_suite(category_manager, category_name, get_doc_filepaths(docs))) -def determine_doc_type(file_path): - """Determine the type of documentation file based on category.json""" - try: - with open('../docs/category.json') as json_file: - category = json.load(json_file) - - # Convert absolute path to relative path from docs directory - rel_path = os.path.relpath(file_path, '../docs') - - if rel_path in category['bash']: - return 'bash' - elif rel_path in category['ppl_cli']: - return 'ppl_cli' - elif rel_path in category['sql_cli']: - return 'sql_cli' - else: - # Try to guess based on file path - if '/ppl/' in file_path: - return 'ppl_cli' - elif '/sql/' in file_path or '/dql/' in file_path: - return 'sql_cli' - else: - return 'bash' # default fallback - except Exception as e: - print(f"Warning: Could not determine doc type from category.json: {e}") - # Fallback to path-based detection - if '/ppl/' in file_path: - return 'ppl_cli' - elif '/sql/' in file_path or '/dql/' in file_path: - return 'sql_cli' - else: - return 'bash' + random.shuffle(tests) + return DocTests(tests) +def get_test_suite(category_manager: CategoryManager, category_name, filepaths): + setup_func = category_manager.get_setup_function(category_name) -def run_single_doctest(file_path, verbose=False, endpoint=None): - """Run doctest for a single documentation file""" - - if not os.path.exists(file_path): - print(f"Error: File {file_path} does not exist") - return False - - # Update endpoint if provided - if endpoint: - global ENDPOINT - ENDPOINT = endpoint - print(f"Using custom endpoint: {endpoint}") - - doc_type = determine_doc_type(file_path) - print(f"Detected doc type: {doc_type}") - print(f"Running doctest for: {file_path}") + if category_name.startswith('bash'): + return create_bash_suite(filepaths, setup_func) + else: + parser = category_manager.get_parser_for_category(category_name) + return create_cli_suite(filepaths, parser, setup_func) + +def list_available_docs(category_manager: CategoryManager): + categories = category_manager.get_all_categories() - # Configure doctest options - optionflags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS - if verbose: - optionflags |= doctest.REPORT_NDIFF + print(f"Available documentation files for testing:\n") - # Choose appropriate parser and setup based on doc type - if doc_type == 'bash': - parser = bash_parser - setup_func = set_up_test_indices - globs = { - 'sh': partial( - subprocess.run, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - timeout=60, - shell=True - ), - 'pretty_print': pretty_print - } - elif doc_type == 'ppl_cli': - parser = ppl_cli_parser - setup_func = set_up_test_indices - globs = {} - else: # sql_cli - parser = sql_cli_parser - setup_func = set_up_test_indices - globs = {} + total = 0 + for category_name in categories.items(): + files = category_manager.get_category_files(category_name) + total += len(files) + print(f"{category_name} docs ({len(files)} files):\n") + for doc in sorted(files): + print(f" ../docs/{doc}\n") - try: - print("Setting up test environment...") - - # Create and run the doctest suite - suite = doctest.DocFileSuite( - file_path, - parser=parser, - setUp=setup_func, - tearDown=tear_down, - optionflags=optionflags, - encoding='utf-8', - globs=globs - ) - - # Run the test - runner = unittest.TextTestRunner(verbosity=2 if verbose else 1) - result = runner.run(suite) - - # Print summary - if result.wasSuccessful(): - print(f"\nSUCCESS: All tests in {os.path.basename(file_path)} passed!") - print(f"Tests run: {result.testsRun}, Failures: {len(result.failures)}, Errors: {len(result.errors)}") - return True - else: - print(f"\nFAILED: {len(result.failures + result.errors)} test(s) failed in {os.path.basename(file_path)}") - print(f"Tests run: {result.testsRun}, Failures: {len(result.failures)}, Errors: {len(result.errors)}") - - if verbose: - print("\nDetailed failure information:") - for failure in result.failures: - print(f"\n--- FAILURE in {failure[0]} ---") - print(failure[1]) - for error in result.errors: - print(f"\n--- ERROR in {error[0]} ---") - print(error[1]) - else: - print("Use --verbose for detailed failure information") - - return False - - except Exception as e: - print(f"Error running doctest: {e}") - if verbose: - import traceback - traceback.print_exc() - return False + print(f"Total: {total} documentation files available for testing\n") -def list_available_docs(): - """List all available documentation files that can be tested""" - try: - with open('../docs/category.json') as json_file: - category = json.load(json_file) - - print("Available documentation files for testing:") - print(f"\nBash-based docs ({len(category['bash'])} files):") - for doc in sorted(category['bash']): - print(f" ../docs/{doc}") - - print(f"\nPPL CLI docs ({len(category['ppl_cli'])} files):") - for doc in sorted(category['ppl_cli']): - print(f" ../docs/{doc}") - - print(f"\nSQL CLI docs ({len(category['sql_cli'])} files):") - for doc in sorted(category['sql_cli']): - print(f" ../docs/{doc}") - - total_docs = len(category['bash']) + len(category['ppl_cli']) + len(category['sql_cli']) - print(f"\nTotal: {total_docs} documentation files available for testing") - - except Exception as e: - print(f"Error reading category.json: {e}") +def resolve_files(category_manager: CategoryManager, file_paths: list[str]): + result = [] + for file_param in file_paths: + resolved_files = category_manager.find_matching_files(file_param) + # add each file if not already in the list + for f in resolved_files: + if f not in result: + result.append(f) + return result def main(): - """Main entry point for single file testing""" parser = argparse.ArgumentParser( description="Run doctest for one or more documentation files, or all files if no arguments provided", formatter_class=argparse.RawDescriptionHelpFormatter, @@ -464,13 +390,15 @@ def main(): python test_docs.py # Run all tests (default behavior) python test_docs.py stats # Run single file (extension optional) python test_docs.py stats.rst --verbose - python test_docs.py stats fields basics + python test_docs.py stats fields basics # Run multiple files + python test_docs.py cmd # Run all files matching 'cmd' (if multiple found) python test_docs.py ../docs/user/ppl/cmd/stats.rst --endpoint http://localhost:9201 Performance Tips: - Use --verbose for detailed debugging information - Ensure OpenSearch is running on the specified endpoint before testing - Extension .rst can be omitted for convenience + - If a filename matches multiple files, all matches will be executed """ ) @@ -483,55 +411,34 @@ def main(): help='List all available documentation files') args = parser.parse_args() - + category_manager = CategoryManager() + if args.list: - list_available_docs() + list_available_docs(category_manager) return - # If no file paths provided, run the default unittest behavior if not args.file_paths: print("No specific files provided. Running full doctest suite...") - # Run the standard unittest discovery unittest.main(module=None, argv=['test_docs.py'], exit=False) return - - # Single file testing mode + + if args.endpoint: + global ENDPOINT + ENDPOINT = endpoint + print(f"Using custom endpoint: {endpoint}") + + all_files_to_test = resolve_files(category_manager, args.file_paths) + + runner = unittest.TextTestRunner(verbosity=2 if args.verbose else 1) + all_success = True - total_files = len(args.file_paths) - - for i, file_path in enumerate(args.file_paths, 1): - if total_files > 1: - print(f"\n{'='*60}") - print(f"Testing file {i}/{total_files}: {file_path}") - print('='*60) - - # Find the actual file path (handles both full paths and just filenames) - actual_file_path = find_doc_file(file_path) - if not actual_file_path: - print(f"Skipping {file_path} - file not found") - all_success = False - continue - - success = run_single_doctest( - actual_file_path, - verbose=args.verbose, - endpoint=args.endpoint - ) - - if not success: - all_success = False - - if total_files > 1: - print(f"\n{'='*60}") - print(f"SUMMARY: Tested {total_files} files") - if all_success: - print("All tests passed!") - else: - print("Some tests failed!") - print('='*60) - - sys.exit(0 if all_success else 1) + for file_path in all_files_to_test: + doc_type = category_manager.find_file_category(file_path) + suite = get_test_suite(category_manager, doc_type, [get_doc_filepath(file_path)]) + result = runner.run(suite) + all_success = all_success and result.wasSuccessful() + sys.exit(0 if all_success else 1) if __name__ == '__main__': main() diff --git a/doctest/test_mapping/events.json b/doctest/test_mapping/events.json new file mode 100644 index 00000000000..664f042324b --- /dev/null +++ b/doctest/test_mapping/events.json @@ -0,0 +1,29 @@ +{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + }, + "event_time": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + }, + "host": { + "type": "keyword" + }, + "message": { + "type": "text" + }, + "level": { + "type": "keyword" + }, + "category": { + "type": "keyword" + }, + "status": { + "type": "keyword" + } + } + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 7193467fd86..264a76d1b12 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -6,6 +6,7 @@ package org.opensearch.sql.calcite.remote; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STRINGS; import static org.opensearch.sql.util.MatcherUtils.assertJsonEqualsIgnoreId; @@ -25,6 +26,7 @@ public void init() throws Exception { loadIndex(Index.BANK_WITH_STRING_VALUES); loadIndex(Index.NESTED_SIMPLE); loadIndex(Index.TIME_TEST_DATA); + loadIndex(Index.LOGS); } @Override @@ -337,6 +339,32 @@ public void testExplainRegexMatchInEvalWithOutScriptPushdown() throws IOExceptio assertJsonEqualsIgnoreId(expected, result); } + // Only for Calcite + @Test + public void testExplainOnEarliestLatest() throws IOException { + String expected = loadExpectedPlan("explain_earliest_latest.json"); + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + "source=%s | stats earliest(message) as earliest_message, latest(message) as" + + " latest_message by server", + TEST_INDEX_LOGS))); + } + + // Only for Calcite + @Test + public void testExplainOnEarliestLatestWithCustomTimeField() throws IOException { + String expected = loadExpectedPlan("explain_earliest_latest_custom_time.json"); + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + "source=%s | stats earliest(message, created_at) as earliest_message," + + " latest(message, created_at) as latest_message by level", + TEST_INDEX_LOGS))); + } + /** * Executes the PPL query and returns the result as a string with windows-style line breaks * replaced with Unix-style ones. diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java index 8b1cca8bcdf..ae45fc86930 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java @@ -10,6 +10,7 @@ import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_CALCS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DATATYPE_NUMERIC; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DATE_FORMATS; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS; import static org.opensearch.sql.util.MatcherUtils.assertJsonEquals; import static org.opensearch.sql.util.MatcherUtils.rows; import static org.opensearch.sql.util.MatcherUtils.schema; @@ -21,7 +22,6 @@ import java.util.Arrays; import java.util.List; import org.json.JSONObject; -import org.junit.Ignore; import org.junit.jupiter.api.Test; import org.opensearch.client.Request; import org.opensearch.sql.ppl.PPLIntegTestCase; @@ -38,6 +38,7 @@ public void init() throws Exception { loadIndex(Index.CALCS); loadIndex(Index.DATE_FORMATS); loadIndex(Index.DATA_TYPE_NUMERIC); + loadIndex(Index.LOGS); } @Test @@ -560,67 +561,43 @@ public void testCountDistinctWithAlias() throws IOException { verifyDataRows(actual, rows(3, "F"), rows(4, "M")); } - @Ignore @Test public void testEarliestAndLatest() throws IOException { JSONObject actual = executeQuery( - String.format( - "source=%s | stats latest(datetime0), earliest(datetime0)", TEST_INDEX_CALCS)); + String.format("source=%s | stats latest(server), earliest(server)", TEST_INDEX_LOGS)); - verifySchema( - actual, - schema("latest(datetime0)", "timestamp"), - schema("earliest(datetime0)", "timestamp")); - verifyDataRows(actual, rows("2004-08-02 07:59:23", "2004-07-04 22:49:28")); + verifySchema(actual, schema("latest(server)", "string"), schema("earliest(server)", "string")); + verifyDataRows(actual, rows("server2", "server1")); } - @Ignore @Test public void testEarliestAndLatestWithAlias() throws IOException { JSONObject actual = executeQuery( String.format( - "source=%s | stats latest(datetime0) as late, earliest(datetime0) as early", - TEST_INDEX_CALCS)); + "source=%s | stats latest(server) as late, earliest(server) as early", + TEST_INDEX_LOGS)); - verifySchema(actual, schema("late", "timestamp"), schema("early", "timestamp")); - verifyDataRows(actual, rows("2004-08-02 07:59:23", "2004-07-04 22:49:28")); + verifySchema(actual, schema("late", "string"), schema("early", "string")); + verifyDataRows(actual, rows("server2", "server1")); } - @Ignore @Test public void testEarliestAndLatestWithBy() throws IOException { JSONObject actual = executeQuery( String.format( - "source=%s | stats latest(datetime0) as late, earliest(datetime0) as early by" - + " bool2", - TEST_INDEX_CALCS)); + "source=%s | stats latest(server) as late, earliest(server) as early by" + " level", + TEST_INDEX_LOGS)); verifySchema( - actual, - schema("late", "timestamp"), - schema("early", "timestamp"), - schema("bool2", "boolean")); + actual, schema("late", "string"), schema("early", "string"), schema("level", "string")); verifyDataRows( actual, - rows("2004-07-31 11:57:52", "2004-07-12 17:30:16", true), - rows("2004-08-02 07:59:23", "2004-07-04 22:49:28", false)); - } - - @Ignore - @Test - public void testEarliestAndLatestWithTimeBy() throws IOException { - JSONObject actual = - executeQuery( - String.format( - "source=%s | stats latest(time1) as late, earliest(time1) as early by" + " bool2", - TEST_INDEX_CALCS)); - - verifySchema( - actual, schema("late", "time"), schema("early", "time"), schema("bool2", "boolean")); - verifyDataRows(actual, rows("19:57:33", "04:40:49", true), rows("22:50:16", "00:05:57", false)); + rows("server3", "server1", "ERROR"), + rows("server2", "server2", "INFO"), + rows("server1", "server1", "WARN")); } @Test diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index f867a69e953..01a6a7d14e8 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -32,6 +32,7 @@ import static org.opensearch.sql.legacy.TestUtils.getJoinTypeIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getJsonTestIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getLocationIndexMapping; +import static org.opensearch.sql.legacy.TestUtils.getLogsIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getMappingFile; import static org.opensearch.sql.legacy.TestUtils.getNestedSimpleIndexMapping; import static org.opensearch.sql.legacy.TestUtils.getNestedTypeIndexMapping; @@ -895,6 +896,11 @@ public enum Index { "hdfs_logs", getHdfsLogsIndexMapping(), "src/test/resources/hdfs_logs.json"), + LOGS( + TestsConstants.TEST_INDEX_LOGS, + "logs", + getLogsIndexMapping(), + "src/test/resources/logs.json"), TIME_TEST_DATA( "opensearch-sql_test_index_time_data", "time_data", diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java index 37a35a7c052..6f605837196 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java @@ -305,6 +305,11 @@ public static String getHdfsLogsIndexMapping() { return getMappingFile(mappingFile); } + public static String getLogsIndexMapping() { + String mappingFile = "logs_index_mapping.json"; + return getMappingFile(mappingFile); + } + public static void loadBulk(Client client, String jsonPath, String defaultIndex) throws Exception { System.out.println(String.format("Loading file %s into opensearch cluster", jsonPath)); diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index 9e9e0698c93..fd017d3ee61 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -79,6 +79,7 @@ public class TestsConstants { public static final String TEST_INDEX_MERGE_TEST_WILDCARD = TEST_INDEX + "_merge_test_*"; public static final String TEST_INDEX_ARRAY = TEST_INDEX + "_array"; public static final String TEST_INDEX_HDFS_LOGS = TEST_INDEX + "_hdfs_logs"; + public static final String TEST_INDEX_LOGS = TEST_INDEX + "_logs"; public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; public static final String TS_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/NewAddedCommandsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/NewAddedCommandsIT.java index ab2d4113f80..ac5c9f0ce56 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/NewAddedCommandsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/NewAddedCommandsIT.java @@ -8,7 +8,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED; -import static org.opensearch.sql.legacy.TestsConstants.*; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_DOG; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STRINGS; import java.io.IOException; diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest.json new file mode 100644 index 00000000000..46143e0c429 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(earliest_message=[$1], latest_message=[$2], server=[$0])\n LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n LogicalProject(server=[$1], message=[$3], @timestamp=[$2])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], earliest_message=[$t1], latest_message=[$t2], server=[$t0])\n EnumerableAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[server, message, @timestamp]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"server\",\"message\",\"@timestamp\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest_custom_time.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest_custom_time.json new file mode 100644 index 00000000000..a3d2d3a634c --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_earliest_latest_custom_time.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(earliest_message=[$1], latest_message=[$2], level=[$0])\n LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n LogicalProject(level=[$4], message=[$3], created_at=[$0])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], earliest_message=[$t1], latest_message=[$t2], level=[$t0])\n EnumerableAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[level, message, created_at]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"level\",\"message\",\"created_at\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_earliest_latest.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_earliest_latest.json new file mode 100644 index 00000000000..ad2df524707 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_earliest_latest.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(earliest_message=[$1], latest_message=[$2], server=[$0])\n LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n LogicalProject(server=[$1], message=[$3], @timestamp=[$2])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], earliest_message=[$t1], latest_message=[$t2], server=[$t0])\n EnumerableAggregate(group=[{1}], earliest_message=[ARG_MIN($3, $2)], latest_message=[ARG_MAX($3, $2)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_earliest_latest_custom_time.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_earliest_latest_custom_time.json new file mode 100644 index 00000000000..ef5ebfe9e1c --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_earliest_latest_custom_time.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(earliest_message=[$1], latest_message=[$2], level=[$0])\n LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], latest_message=[ARG_MAX($1, $2)])\n LogicalProject(level=[$4], message=[$3], created_at=[$0])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..2=[{inputs}], earliest_message=[$t1], latest_message=[$t2], level=[$t0])\n EnumerableAggregate(group=[{4}], earliest_message=[ARG_MIN($3, $0)], latest_message=[ARG_MAX($3, $0)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/indexDefinitions/logs_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/logs_index_mapping.json new file mode 100644 index 00000000000..3259ebaefd5 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/logs_index_mapping.json @@ -0,0 +1,29 @@ +{ + "mappings": { + "properties": { + "server": { + "type": "keyword" + }, + "level": { + "type": "keyword" + }, + "message": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "@timestamp": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + }, + "created_at": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + } + } + } +} diff --git a/integ-test/src/test/resources/logs.json b/integ-test/src/test/resources/logs.json new file mode 100644 index 00000000000..8431a185172 --- /dev/null +++ b/integ-test/src/test/resources/logs.json @@ -0,0 +1,10 @@ +{"index":{"_id":"1"}} +{"server":"server1","level":"ERROR","message":"Database connection failed","@timestamp":"2023-01-01T00:00:00.000Z","created_at":"2023-01-05T00:00:00.000Z"} +{"index":{"_id":"2"}} +{"server":"server2","level":"INFO","message":"Service started","@timestamp":"2023-01-02T00:00:00.000Z","created_at":"2023-01-04T00:00:00.000Z"} +{"index":{"_id":"3"}} +{"server":"server1","level":"WARN","message":"High memory usage","@timestamp":"2023-01-03T00:00:00.000Z","created_at":"2023-01-03T00:00:00.000Z"} +{"index":{"_id":"4"}} +{"server":"server3","level":"ERROR","message":"Disk space low","@timestamp":"2023-01-04T00:00:00.000Z","created_at":"2023-01-02T00:00:00.000Z"} +{"index":{"_id":"5"}} +{"server":"server2","level":"INFO","message":"Backup completed","@timestamp":"2023-01-05T00:00:00.000Z","created_at":"2023-01-01T00:00:00.000Z"} diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index f7aa477ddea..f1832317d92 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -464,6 +464,7 @@ statsFunction | (DISTINCT_COUNT | DC | DISTINCT_COUNT_APPROX) LT_PRTHS valueExpression RT_PRTHS # distinctCountFunctionCall | takeAggFunction # takeAggFunctionCall | percentileApproxFunction # percentileApproxFunctionCall + | earliestLatestFunction # earliestLatestFunctionCall ; statsFunctionName @@ -478,8 +479,10 @@ statsFunctionName | STDDEV_POP | PERCENTILE | PERCENTILE_APPROX - | EARLIEST - | LATEST + ; + +earliestLatestFunction + : (EARLIEST | LATEST) LT_PRTHS valueExpression (COMMA timeField = valueExpression)? RT_PRTHS ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 63793a0d429..dec1383a2b9 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -22,7 +22,35 @@ import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.RuleContext; import org.opensearch.sql.ast.dsl.AstDSL; -import org.opensearch.sql.ast.expression.*; +import org.opensearch.sql.ast.expression.AggregateFunction; +import org.opensearch.sql.ast.expression.Alias; +import org.opensearch.sql.ast.expression.AllFields; +import org.opensearch.sql.ast.expression.And; +import org.opensearch.sql.ast.expression.Between; +import org.opensearch.sql.ast.expression.Case; +import org.opensearch.sql.ast.expression.Cast; +import org.opensearch.sql.ast.expression.Compare; +import org.opensearch.sql.ast.expression.DataType; +import org.opensearch.sql.ast.expression.EqualTo; +import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Function; +import org.opensearch.sql.ast.expression.In; +import org.opensearch.sql.ast.expression.Interval; +import org.opensearch.sql.ast.expression.IntervalUnit; +import org.opensearch.sql.ast.expression.LambdaFunction; +import org.opensearch.sql.ast.expression.Let; +import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.Not; +import org.opensearch.sql.ast.expression.Or; +import org.opensearch.sql.ast.expression.QualifiedName; +import org.opensearch.sql.ast.expression.RelevanceFieldList; +import org.opensearch.sql.ast.expression.Span; +import org.opensearch.sql.ast.expression.SpanUnit; +import org.opensearch.sql.ast.expression.UnresolvedArgument; +import org.opensearch.sql.ast.expression.UnresolvedExpression; +import org.opensearch.sql.ast.expression.When; +import org.opensearch.sql.ast.expression.WindowFunction; +import org.opensearch.sql.ast.expression.Xor; import org.opensearch.sql.ast.expression.subquery.ExistsSubquery; import org.opensearch.sql.ast.expression.subquery.InSubquery; import org.opensearch.sql.ast.expression.subquery.ScalarSubquery; @@ -289,6 +317,30 @@ public UnresolvedExpression visitPercentileShortcutFunctionCall( new UnresolvedArgument("percent", AstDSL.doubleLiteral(percent)))); } + public UnresolvedExpression visitEarliestLatestFunctionCall( + OpenSearchPPLParser.EarliestLatestFunctionCallContext ctx) { + return visit(ctx.earliestLatestFunction()); + } + + @Override + public UnresolvedExpression visitEarliestLatestFunction( + OpenSearchPPLParser.EarliestLatestFunctionContext ctx) { + String functionName = ctx.EARLIEST() != null ? "earliest" : "latest"; + UnresolvedExpression valueField = visit(ctx.valueExpression(0)); + + if (ctx.timeField != null) { + // Two parameters: earliest(field, time_field) or latest(field, time_field) + UnresolvedExpression timeField = visit(ctx.timeField); + return new AggregateFunction( + functionName, + valueField, + Collections.singletonList(new UnresolvedArgument("time_field", timeField))); + } else { + // Single parameter: earliest(field) or latest(field) - uses default @timestamp + return new AggregateFunction(functionName, valueField); + } + } + /** Case function. */ @Override public UnresolvedExpression visitCaseFunctionCall( diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java new file mode 100644 index 00000000000..16a57bacb57 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java @@ -0,0 +1,292 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableList; +import java.sql.Date; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +/** Unit tests for {@code earliest/latest} functions with @timestamp field in PPL. */ +public class CalcitePPLEarliestLatestTest extends CalcitePPLAbstractTest { + public CalcitePPLEarliestLatestTest() { + super(CalciteAssert.SchemaSpec.POST); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + + // Add a test table with @timestamp and created_at fields + // Note: @timestamp and created_at have different orderings to test explicit field usage + ImmutableList rows = + ImmutableList.of( + new Object[] { + "server1", + "ERROR", + "Database connection failed", + Date.valueOf("2023-01-01"), + Date.valueOf("2023-01-05") + }, + new Object[] { + "server2", + "INFO", + "Service started", + Date.valueOf("2023-01-02"), + Date.valueOf("2023-01-04") + }, + new Object[] { + "server1", + "WARN", + "High memory usage", + Date.valueOf("2023-01-03"), + Date.valueOf("2023-01-03") + }, + new Object[] { + "server3", + "ERROR", + "Disk space low", + Date.valueOf("2023-01-04"), + Date.valueOf("2023-01-02") + }, + new Object[] { + "server2", + "INFO", + "Backup completed", + Date.valueOf("2023-01-05"), + Date.valueOf("2023-01-01") + }); + schema.add("LOGS", new LogsTable(rows)); + + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @Test + public void testEarliestWithoutSecondArgument() { + String ppl = "source=LOGS | stats earliest(message) as earliest_message"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalAggregate(group=[{}], earliest_message=[ARG_MIN($0, $1)])\n" + + " LogicalProject(message=[$2], @timestamp=[$3])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "earliest_message=Database connection failed\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARG_MIN(`message`, `@timestamp`) `earliest_message`\n" + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testLatestWithoutSecondArgument() { + String ppl = "source=LOGS | stats latest(message) as latest_message"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalAggregate(group=[{}], latest_message=[ARG_MAX($0, $1)])\n" + + " LogicalProject(message=[$2], @timestamp=[$3])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "latest_message=Backup completed\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARG_MAX(`message`, `@timestamp`) `latest_message`\n" + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEarliestByServerWithoutSecondArgument() { + String ppl = "source=LOGS | stats earliest(message) as earliest_message by server"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(earliest_message=[$1], server=[$0])\n" + + " LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)])\n" + + " LogicalProject(server=[$0], message=[$2], @timestamp=[$3])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = + "earliest_message=Disk space low; server=server3\n" + + "earliest_message=Service started; server=server2\n" + + "earliest_message=Database connection failed; server=server1\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARG_MIN(`message`, `@timestamp`) `earliest_message`, `server`\n" + + "FROM `POST`.`LOGS`\n" + + "GROUP BY `server`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testLatestByServerWithoutSecondArgument() { + String ppl = "source=LOGS | stats latest(message) as latest_message by server"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(latest_message=[$1], server=[$0])\n" + + " LogicalAggregate(group=[{0}], latest_message=[ARG_MAX($1, $2)])\n" + + " LogicalProject(server=[$0], message=[$2], @timestamp=[$3])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = + "latest_message=Disk space low; server=server3\n" + + "latest_message=Backup completed; server=server2\n" + + "latest_message=High memory usage; server=server1\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARG_MAX(`message`, `@timestamp`) `latest_message`, `server`\n" + + "FROM `POST`.`LOGS`\n" + + "GROUP BY `server`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEarliestWithOtherAggregatesWithoutSecondArgument() { + String ppl = + "source=LOGS | stats earliest(message) as earliest_message, count() as cnt by server"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(earliest_message=[$1], cnt=[$2], server=[$0])\n" + + " LogicalAggregate(group=[{0}], earliest_message=[ARG_MIN($1, $2)], cnt=[COUNT()])\n" + + " LogicalProject(server=[$0], message=[$2], @timestamp=[$3])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = + "earliest_message=Disk space low; cnt=1; server=server3\n" + + "earliest_message=Service started; cnt=2; server=server2\n" + + "earliest_message=Database connection failed; cnt=2; server=server1\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARG_MIN(`message`, `@timestamp`) `earliest_message`, " + + "COUNT(*) `cnt`, `server`\n" + + "FROM `POST`.`LOGS`\n" + + "GROUP BY `server`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEarliestWithExplicitTimestampField() { + String ppl = "source=LOGS | stats earliest(message, created_at) as earliest_message"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalAggregate(group=[{}], earliest_message=[ARG_MIN($0, $1)])\n" + + " LogicalProject(message=[$2], created_at=[$4])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "earliest_message=Backup completed\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARG_MIN(`message`, `created_at`) `earliest_message`\n" + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testLatestWithExplicitTimestampField() { + String ppl = "source=LOGS | stats latest(message, created_at) as latest_message"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalAggregate(group=[{}], latest_message=[ARG_MAX($0, $1)])\n" + + " LogicalProject(message=[$2], created_at=[$4])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedResult = "latest_message=Database connection failed\n"; + verifyResult(root, expectedResult); + + String expectedSparkSql = + "SELECT ARG_MAX(`message`, `created_at`) `latest_message`\n" + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + // Custom table implementation with @timestamp field + @RequiredArgsConstructor + public static class LogsTable implements ScannableTable { + private final ImmutableList rows; + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("server", SqlTypeName.VARCHAR) + .add("level", SqlTypeName.VARCHAR) + .add("message", SqlTypeName.VARCHAR) + .add("@timestamp", SqlTypeName.DATE) + .add("created_at", SqlTypeName.DATE) + .build(); + + @Override + public Enumerable<@Nullable Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +}