diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java index 22943875c87..f82e63302ab 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java @@ -533,6 +533,7 @@ public RexNode visitWindowFunction(WindowFunction node, CalcitePlanContext conte (arguments.isEmpty() || arguments.size() == 1) ? Collections.emptyList() : arguments.subList(1, arguments.size()); + PPLFuncImpTable.INSTANCE.validateAggFunctionSignature(functionName, field, args); return PlanUtils.makeOver( context, functionName, field, args, partitions, List.of(), node.getWindowFrame()); }) diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PPLOperandTypes.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PPLOperandTypes.java index e6797ee9960..0d343711b3f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PPLOperandTypes.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PPLOperandTypes.java @@ -23,6 +23,10 @@ public class PPLOperandTypes { private PPLOperandTypes() {} public static final UDFOperandMetadata NONE = UDFOperandMetadata.wrap(OperandTypes.family()); + public static final UDFOperandMetadata OPTIONAL_ANY = + UDFOperandMetadata.wrap( + (CompositeOperandTypeChecker) + OperandTypes.family(SqlTypeFamily.ANY).or(OperandTypes.family())); public static final UDFOperandMetadata OPTIONAL_INTEGER = UDFOperandMetadata.wrap( (CompositeOperandTypeChecker) OperandTypes.INTEGER.or(OperandTypes.family())); @@ -43,6 +47,10 @@ private PPLOperandTypes() {} UDFOperandMetadata.wrap( (CompositeOperandTypeChecker) OperandTypes.ANY.or(OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.INTEGER))); + public static final UDFOperandMetadata ANY_OPTIONAL_TIMESTAMP = + UDFOperandMetadata.wrap( + (CompositeOperandTypeChecker) + OperandTypes.ANY.or(OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.TIMESTAMP))); public static final UDFOperandMetadata INTEGER_INTEGER = UDFOperandMetadata.wrap((FamilyOperandTypeChecker) OperandTypes.INTEGER_INTEGER); public static final UDFOperandMetadata STRING_STRING = @@ -121,6 +129,12 @@ private PPLOperandTypes() {} (CompositeOperandTypeChecker) OperandTypes.DATETIME.or( OperandTypes.family(SqlTypeFamily.DATETIME, SqlTypeFamily.INTEGER))); + public static final UDFOperandMetadata ANY_DATETIME_OR_STRING = + UDFOperandMetadata.wrap( + (CompositeOperandTypeChecker) + OperandTypes.family(SqlTypeFamily.ANY) + .or(OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.DATETIME)) + .or(OperandTypes.family(SqlTypeFamily.ANY, SqlTypeFamily.STRING))); public static final UDFOperandMetadata DATETIME_DATETIME = UDFOperandMetadata.wrap(OperandTypes.family(SqlTypeFamily.DATETIME, SqlTypeFamily.DATETIME)); diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java index f98aebd2e7c..80658557aa9 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java @@ -14,6 +14,8 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.calcite.plan.RelOptTable; import org.apache.calcite.rel.RelHomogeneousShuttle; @@ -21,6 +23,7 @@ import org.apache.calcite.rel.RelShuttle; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; @@ -391,4 +394,19 @@ public Void visitInputRef(RexInputRef inputRef) { visitor.visitEach(rexNodes); return selectedColumns; } + + /** + * Get a string representation of the argument types expressed in ExprType for error messages. + * + * @param argTypes the list of argument types as {@link RelDataType} + * @return a string in the format [type1,type2,...] representing the argument types + */ + public static String getActualSignature(List argTypes) { + return "[" + + argTypes.stream() + .map(OpenSearchTypeFactory::convertRelDataTypeToExprType) + .map(Objects::toString) + .collect(Collectors.joining(",")) + + "]"; + } } diff --git a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java index a9b3f7dc013..c2079a60183 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java @@ -380,8 +380,8 @@ public enum BuiltinFunctionName { .put("stddev", BuiltinFunctionName.STDDEV_POP) .put("stddev_pop", BuiltinFunctionName.STDDEV_POP) .put("stddev_samp", BuiltinFunctionName.STDDEV_SAMP) - // .put("earliest", BuiltinFunctionName.EARLIEST) - // .put("latest", BuiltinFunctionName.LATEST) + .put("earliest", BuiltinFunctionName.EARLIEST) + .put("latest", BuiltinFunctionName.LATEST) .put("distinct_count_approx", BuiltinFunctionName.DISTINCT_COUNT_APPROX) .put("dc", BuiltinFunctionName.DISTINCT_COUNT_APPROX) .put("distinct_count", BuiltinFunctionName.DISTINCT_COUNT_APPROX) diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java index a56d2bc06d8..89026c17699 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java @@ -231,7 +231,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; @@ -261,7 +260,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.sql.calcite.CalcitePlanContext; -import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; import org.opensearch.sql.calcite.utils.PPLOperandTypes; import org.opensearch.sql.calcite.utils.PlanUtils; import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; @@ -407,25 +405,40 @@ public void registerExternalAggOperator( aggExternalFunctionRegistry.put(functionName, Pair.of(signature, handler)); } + public void validateAggFunctionSignature( + BuiltinFunctionName functionName, RexNode field, List argList) { + var implementation = getImplementation(functionName); + validateFunctionArgs(implementation, functionName, field, argList); + } + public RelBuilder.AggCall resolveAgg( BuiltinFunctionName functionName, boolean distinct, RexNode field, List argList, CalcitePlanContext context) { - var implementation = aggExternalFunctionRegistry.get(functionName); - if (implementation == null) { - implementation = aggFunctionRegistry.get(functionName); - } - if (implementation == null) { - throw new IllegalStateException(String.format("Cannot resolve function: %s", functionName)); - } + var implementation = getImplementation(functionName); + + // Validation is done based on original argument types to generate error from user perspective. + validateFunctionArgs(implementation, functionName, field, argList); + + var handler = implementation.getValue(); + return handler.apply(distinct, field, argList, context); + } + + static void validateFunctionArgs( + Pair implementation, + BuiltinFunctionName functionName, + RexNode field, + List argList) { CalciteFuncSignature signature = implementation.getKey(); + List argTypes = new ArrayList<>(); if (field != null) { argTypes.add(field.getType()); } - // Currently only PERCENTILE_APPROX and TAKE have additional arguments. + + // Currently only PERCENTILE_APPROX, TAKE, EARLIEST, and LATEST have additional arguments. // Their additional arguments will always come as a map of List additionalArgTypes = argList.stream().map(PlanUtils::derefMapCall).map(RexNode::getType).toList(); @@ -441,10 +454,20 @@ public RelBuilder.AggCall resolveAgg( errorMessagePattern, functionName, signature.typeChecker().getAllowedSignatures(), - getActualSignature(argTypes))); + PlanUtils.getActualSignature(argTypes))); } - var handler = implementation.getValue(); - return handler.apply(distinct, field, argList, context); + } + + private Pair getImplementation( + BuiltinFunctionName functionName) { + var implementation = aggExternalFunctionRegistry.get(functionName); + if (implementation == null) { + implementation = aggFunctionRegistry.get(functionName); + } + if (implementation == null) { + throw new IllegalStateException(String.format("Cannot resolve function: %s", functionName)); + } + return implementation; } public RexNode resolve(final RexBuilder builder, final String functionName, RexNode... args) { @@ -492,7 +515,7 @@ public RexNode resolve( throw new ExpressionEvaluationException( String.format( "Cannot resolve function: %s, arguments: %s, caused by: %s", - functionName, getActualSignature(argTypes), e.getMessage()), + functionName, PlanUtils.getActualSignature(argTypes), e.getMessage()), e); } StringJoiner allowedSignatures = new StringJoiner(","); @@ -505,7 +528,7 @@ functionName, getActualSignature(argTypes), e.getMessage()), throw new ExpressionEvaluationException( String.format( "%s function expects {%s}, but got %s", - functionName, allowedSignatures, getActualSignature(argTypes))); + functionName, allowedSignatures, PlanUtils.getActualSignature(argTypes))); } /** @@ -1072,21 +1095,6 @@ void registerOperator(BuiltinFunctionName functionName, SqlAggFunction aggFuncti register(functionName, handler, typeChecker); } - private static RexNode resolveTimeField(List argList, CalcitePlanContext ctx) { - if (argList.isEmpty()) { - // Try to find @timestamp field - var timestampField = - ctx.relBuilder.peek().getRowType().getField("@timestamp", false, false); - if (timestampField == null) { - throw new IllegalArgumentException( - "Default @timestamp field not found. Please specify a time field explicitly."); - } - return ctx.rexBuilder.makeInputRef(timestampField.getType(), timestampField.getIndex()); - } else { - return PlanUtils.derefMapCall(argList.get(0)); - } - } - void populate() { registerOperator(MAX, SqlStdOperatorTable.MAX); registerOperator(MIN, SqlStdOperatorTable.MIN); @@ -1116,8 +1124,7 @@ void populate() { return ctx.relBuilder.count(distinct, null, field); } }, - wrapSqlOperandTypeChecker( - SqlStdOperatorTable.COUNT.getOperandTypeChecker(), COUNT.name(), false)); + wrapSqlOperandTypeChecker(PPLOperandTypes.OPTIONAL_ANY, COUNT.name(), false)); register( PERCENTILE_APPROX, @@ -1164,20 +1171,22 @@ void populate() { register( EARLIEST, (distinct, field, argList, ctx) -> { - RexNode timeField = resolveTimeField(argList, ctx); - return ctx.relBuilder.aggregateCall(SqlStdOperatorTable.ARG_MIN, field, timeField); + List args = resolveTimeField(argList, ctx); + return UserDefinedFunctionUtils.makeAggregateCall( + SqlStdOperatorTable.ARG_MIN, List.of(field), args, ctx.relBuilder); }, wrapSqlOperandTypeChecker( - SqlStdOperatorTable.ARG_MIN.getOperandTypeChecker(), EARLIEST.name(), false)); + PPLOperandTypes.ANY_OPTIONAL_TIMESTAMP, EARLIEST.name(), false)); register( LATEST, (distinct, field, argList, ctx) -> { - RexNode timeField = resolveTimeField(argList, ctx); - return ctx.relBuilder.aggregateCall(SqlStdOperatorTable.ARG_MAX, field, timeField); + List args = resolveTimeField(argList, ctx); + return UserDefinedFunctionUtils.makeAggregateCall( + SqlStdOperatorTable.ARG_MAX, List.of(field), args, ctx.relBuilder); }, wrapSqlOperandTypeChecker( - SqlStdOperatorTable.ARG_MAX.getOperandTypeChecker(), LATEST.name(), false)); + PPLOperandTypes.ANY_OPTIONAL_TIMESTAMP, EARLIEST.name(), false)); // Register FIRST function - uses document order register( @@ -1201,19 +1210,19 @@ void populate() { } } - /** - * Get a string representation of the argument types expressed in ExprType for error messages. - * - * @param argTypes the list of argument types as {@link RelDataType} - * @return a string in the format [type1,type2,...] representing the argument types - */ - private static String getActualSignature(List argTypes) { - return "[" - + argTypes.stream() - .map(OpenSearchTypeFactory::convertRelDataTypeToExprType) - .map(Objects::toString) - .collect(Collectors.joining(",")) - + "]"; + static List resolveTimeField(List argList, CalcitePlanContext ctx) { + if (argList.isEmpty()) { + // Try to find @timestamp field + var timestampField = ctx.relBuilder.peek().getRowType().getField("@timestamp", false, false); + if (timestampField == null) { + throw new IllegalArgumentException( + "Default @timestamp field not found. Please specify a time field explicitly."); + } + return List.of( + ctx.rexBuilder.makeInputRef(timestampField.getType(), timestampField.getIndex())); + } else { + return argList.stream().map(PlanUtils::derefMapCall).collect(Collectors.toList()); + } } /** @@ -1257,6 +1266,8 @@ private static PPLTypeChecker wrapSqlOperandTypeChecker( pplTypeChecker = PPLTypeChecker.wrapComparable(comparableTypeChecker); } else if (typeChecker instanceof UDFOperandMetadata.UDTOperandMetadata udtOperandMetadata) { pplTypeChecker = PPLTypeChecker.wrapUDT(udtOperandMetadata.allowedParamTypes()); + } else if (typeChecker != null) { + pplTypeChecker = PPLTypeChecker.wrapDefault(typeChecker); } else { logger.info( "Cannot create type checker for function: {}. Will skip its type checking", functionName); diff --git a/core/src/main/java/org/opensearch/sql/expression/function/PPLTypeChecker.java b/core/src/main/java/org/opensearch/sql/expression/function/PPLTypeChecker.java index adc10e63b71..bb58a38a109 100644 --- a/core/src/main/java/org/opensearch/sql/expression/function/PPLTypeChecker.java +++ b/core/src/main/java/org/opensearch/sql/expression/function/PPLTypeChecker.java @@ -52,7 +52,7 @@ public interface PPLTypeChecker { /** * Get a string representation of the allowed signatures. The format is like {@code - * [STRING,STRING],[INTEGER,INTEGER]}. + * [STRING,STRING]|[INTEGER,INTEGER]}. * * @return a string representation of the allowed signatures */ @@ -220,7 +220,7 @@ public String getAllowedSignatures() { for (SqlOperandTypeChecker rule : allowedRules) { if (rule instanceof FamilyOperandTypeChecker familyOperandTypeChecker) { if (!builder.isEmpty()) { - builder.append(","); + builder.append("|"); } builder.append(PPLTypeChecker.getFamilySignatures(familyOperandTypeChecker)); } else { @@ -343,6 +343,84 @@ public List> getParameterTypes() { } } + class PPLDefaultTypeChecker implements PPLTypeChecker { + private final SqlOperandTypeChecker internal; + + public PPLDefaultTypeChecker(SqlOperandTypeChecker typeChecker) { + internal = typeChecker; + } + + @Override + public boolean checkOperandTypes(List types) { + // Basic operand count validation + if (!internal.getOperandCountRange().isValidCount(types.size())) { + return false; + } + + // If the internal checker is a FamilyOperandTypeChecker, use type family validation + if (internal instanceof FamilyOperandTypeChecker familyChecker) { + List families = + IntStream.range(0, types.size()) + .mapToObj(familyChecker::getOperandSqlTypeFamily) + .collect(Collectors.toList()); + return validateOperands(families, types); + } + + // For other types of checkers, we can only validate operand count + // This is a fallback - we assume the types are valid if count is correct + return true; + } + + @Override + public String getAllowedSignatures() { + if (internal instanceof FamilyOperandTypeChecker familyChecker) { + return getFamilySignatures(familyChecker); + } else { + // Generate a generic signature based on operand count range + int min = internal.getOperandCountRange().getMin(); + int max = internal.getOperandCountRange().getMax(); + + if (min == -1 || max == -1) { + return "[ANY...]"; + } else if (min == max) { + return "[" + String.join(",", Collections.nCopies(min, "ANY")) + "]"; + } else { + List signatures = new ArrayList<>(); + final int MAX_ARGS = 10; + max = Math.min(MAX_ARGS, max); + for (int i = min; i <= max; i++) { + signatures.add("[" + String.join(",", Collections.nCopies(i, "ANY")) + "]"); + } + return String.join("|", signatures); + } + } + } + + @Override + public List> getParameterTypes() { + if (internal instanceof FamilyOperandTypeChecker familyChecker) { + return getExprSignatures(familyChecker); + } else { + // For unknown type checkers, return UNKNOWN types + int min = internal.getOperandCountRange().getMin(); + int max = internal.getOperandCountRange().getMax(); + + if (min == -1 || max == -1) { + // Variable arguments - return a single signature with UNKNOWN + return List.of(List.of(ExprCoreType.UNKNOWN)); + } else { + List> parameterTypes = new ArrayList<>(); + final int MAX_ARGS = 10; + max = Math.min(MAX_ARGS, max); + for (int i = min; i <= max; i++) { + parameterTypes.add(Collections.nCopies(i, ExprCoreType.UNKNOWN)); + } + return parameterTypes; + } + } + } + } + /** * Creates a {@link PPLFamilyTypeChecker} with a fixed operand count, validating that each operand * belongs to its corresponding {@link SqlTypeFamily}. @@ -418,6 +496,22 @@ static PPLComparableTypeChecker wrapComparable(SameOperandTypeChecker typeChecke return new PPLComparableTypeChecker(typeChecker); } + /** + * Creates a {@link PPLDefaultTypeChecker} that wraps any {@link SqlOperandTypeChecker} and + * provides basic type checking functionality when specialized PPL type checkers cannot be used. + * + *

This is a fallback wrapper that provides basic operand count validation and attempts to + * extract type family information when possible. It should be used when other specialized PPL + * type checkers (like {@link PPLFamilyTypeChecker}, {@link PPLCompositeTypeChecker}, etc.) are + * not applicable. + * + * @param typeChecker the Calcite type checker to wrap + * @return a {@link PPLDefaultTypeChecker} that provides basic type checking functionality + */ + static PPLDefaultTypeChecker wrapDefault(SqlOperandTypeChecker typeChecker) { + return new PPLDefaultTypeChecker(typeChecker); + } + /** * Create a {@link PPLTypeChecker} from a list of allowed signatures consisted of {@link * ExprType}. This is useful to validate arguments against user-defined types (UDT) that does not @@ -599,6 +693,6 @@ private static String formatExprSignatures(List> signatures) { .map(t -> t == ExprCoreType.UNDEFINED ? "ANY" : t.toString()) .collect(Collectors.joining(",")) + "]") - .collect(Collectors.joining(",")); + .collect(Collectors.joining("|")); } } diff --git a/core/src/test/java/org/opensearch/sql/expression/aggregation/PercentileApproxAggregatorTest.java b/core/src/test/java/org/opensearch/sql/expression/aggregation/PercentileApproxAggregatorTest.java index 1936c1380ee..5d1d86189bf 100644 --- a/core/src/test/java/org/opensearch/sql/expression/aggregation/PercentileApproxAggregatorTest.java +++ b/core/src/test/java/org/opensearch/sql/expression/aggregation/PercentileApproxAggregatorTest.java @@ -200,8 +200,7 @@ public void test_percentile_with_invalid_size() { tuples)); assertEquals( "percentile_approx function expected" - + " {[INTEGER,DOUBLE],[INTEGER,DOUBLE,DOUBLE],[LONG,DOUBLE],[LONG,DOUBLE,DOUBLE]," - + "[FLOAT,DOUBLE],[FLOAT,DOUBLE,DOUBLE],[DOUBLE,DOUBLE],[DOUBLE,DOUBLE,DOUBLE]}," + + " {[INTEGER,DOUBLE],[INTEGER,DOUBLE,DOUBLE],[LONG,DOUBLE],[LONG,DOUBLE,DOUBLE],[FLOAT,DOUBLE],[FLOAT,DOUBLE,DOUBLE],[DOUBLE,DOUBLE],[DOUBLE,DOUBLE,DOUBLE]}," + " but got [DOUBLE,STRING]", exception2.getMessage()); } diff --git a/docs/category.json b/docs/category.json index e841ca84376..38c16255d03 100644 --- a/docs/category.json +++ b/docs/category.json @@ -56,11 +56,12 @@ ], "ppl_cli_calcite": [ "user/ppl/cmd/append.rst", + "user/ppl/cmd/eventstats.rst", "user/ppl/cmd/fields.rst", "user/ppl/cmd/regex.rst", + "user/ppl/cmd/rename.rst", "user/ppl/cmd/rex.rst", "user/ppl/cmd/stats.rst", - "user/ppl/cmd/rename.rst", "user/ppl/cmd/timechart.rst" ] } diff --git a/docs/user/ppl/cmd/eventstats.rst b/docs/user/ppl/cmd/eventstats.rst index f401a0f2d97..f81401934ec 100644 --- a/docs/user/ppl/cmd/eventstats.rst +++ b/docs/user/ppl/cmd/eventstats.rst @@ -93,16 +93,16 @@ Usage: Returns a count of the number of expr in the rows retrieved by a SELECT s Example:: - PPL> source=accounts | eventstats count(); + os> source=accounts | fields account_number, gender, age | eventstats count() | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+---------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | count() | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+---------| - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 4 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 4 | - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 4 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 4 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+---------+ + +----------------+--------+-----+---------+ + | account_number | gender | age | count() | + |----------------+--------+-----+---------| + | 1 | M | 32 | 4 | + | 6 | M | 36 | 4 | + | 13 | F | 28 | 4 | + | 18 | M | 33 | 4 | + +----------------+--------+-----+---------+ SUM --- @@ -114,16 +114,16 @@ Usage: SUM(expr). Returns the sum of expr. Example:: - PPL> source=accounts | eventstats sum(age) by gender; + os> source=accounts | fields account_number, gender, age | eventstats sum(age) by gender | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | sum(age) | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------| - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28 | - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 101 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 101 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 101 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ + +----------------+--------+-----+----------+ + | account_number | gender | age | sum(age) | + |----------------+--------+-----+----------| + | 1 | M | 32 | 101 | + | 6 | M | 36 | 101 | + | 13 | F | 28 | 28 | + | 18 | M | 33 | 101 | + +----------------+--------+-----+----------+ AVG --- @@ -135,16 +135,16 @@ Usage: AVG(expr). Returns the average value of expr. Example:: - PPL> source=accounts | eventstats avg(age) by gender; + os> source=accounts | fields account_number, gender, age | eventstats avg(age) by gender | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | avg(age) | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------| - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28.0 | - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 33.666666666666664 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 33.666666666666664 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 33.666666666666664 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ + +----------------+--------+-----+--------------------+ + | account_number | gender | age | avg(age) | + |----------------+--------+-----+--------------------| + | 1 | M | 32 | 33.666666666666664 | + | 6 | M | 36 | 33.666666666666664 | + | 13 | F | 28 | 28.0 | + | 18 | M | 33 | 33.666666666666664 | + +----------------+--------+-----+--------------------+ MAX --- @@ -156,16 +156,16 @@ Usage: MAX(expr). Returns the maximum value of expr. Example:: - PPL> source=accounts | eventstats max(age); + os> source=accounts | fields account_number, gender, age | eventstats max(age) | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | max(age) | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------| - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 36 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 36 | - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 36 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 36 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ + +----------------+--------+-----+----------+ + | account_number | gender | age | max(age) | + |----------------+--------+-----+----------| + | 1 | M | 32 | 36 | + | 6 | M | 36 | 36 | + | 13 | F | 28 | 36 | + | 18 | M | 33 | 36 | + +----------------+--------+-----+----------+ MIN --- @@ -177,16 +177,16 @@ Usage: MIN(expr). Returns the minimum value of expr. Example:: - PPL> source=accounts | eventstats min(age) by gender; + os> source=accounts | fields account_number, gender, age | eventstats min(age) by gender | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | min(age) | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------| - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28 | - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 32 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 32 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 32 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+----------+ + +----------------+--------+-----+----------+ + | account_number | gender | age | min(age) | + |----------------+--------+-----+----------| + | 1 | M | 32 | 32 | + | 6 | M | 36 | 32 | + | 13 | F | 28 | 28 | + | 18 | M | 33 | 32 | + +----------------+--------+-----+----------+ VAR_SAMP @@ -199,16 +199,16 @@ Usage: VAR_SAMP(expr). Returns the sample variance of expr. Example:: - PPL> source=accounts | eventstats var_samp(age); + os> source=accounts | fields account_number, gender, age | eventstats var_samp(age) | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | var_samp(age) | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------| - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 10.916666666666666 | - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 10.916666666666666 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 10.916666666666666 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 10.916666666666666 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ + +----------------+--------+-----+--------------------+ + | account_number | gender | age | var_samp(age) | + |----------------+--------+-----+--------------------| + | 1 | M | 32 | 10.916666666666666 | + | 6 | M | 36 | 10.916666666666666 | + | 13 | F | 28 | 10.916666666666666 | + | 18 | M | 33 | 10.916666666666666 | + +----------------+--------+-----+--------------------+ VAR_POP @@ -221,17 +221,16 @@ Usage: VAR_POP(expr). Returns the population standard variance of expr. Example:: - PPL> source=accounts | eventstats var_pop(age); + os> source=accounts | fields account_number, gender, age | eventstats var_pop(age) | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | var_pop(age) | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------| - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 8.1875 | - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 8.1875 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 8.1875 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 8.1875 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------+ - + +----------------+--------+-----+--------------+ + | account_number | gender | age | var_pop(age) | + |----------------+--------+-----+--------------| + | 1 | M | 32 | 8.1875 | + | 6 | M | 36 | 8.1875 | + | 13 | F | 28 | 8.1875 | + | 18 | M | 33 | 8.1875 | + +----------------+--------+-----+--------------+ STDDEV_SAMP ----------- @@ -243,16 +242,16 @@ Usage: STDDEV_SAMP(expr). Return the sample standard deviation of expr. Example:: - PPL> source=accounts | eventstats stddev_samp(age); + os> source=accounts | fields account_number, gender, age | eventstats stddev_samp(age) | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-------------------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | stddev_samp(age) | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-------------------| - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 3.304037933599835 | - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 3.304037933599835 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 3.304037933599835 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 3.304037933599835 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-------------------+ + +----------------+--------+-----+-------------------+ + | account_number | gender | age | stddev_samp(age) | + |----------------+--------+-----+-------------------| + | 1 | M | 32 | 3.304037933599835 | + | 6 | M | 36 | 3.304037933599835 | + | 13 | F | 28 | 3.304037933599835 | + | 18 | M | 33 | 3.304037933599835 | + +----------------+--------+-----+-------------------+ STDDEV_POP @@ -265,16 +264,16 @@ Usage: STDDEV_POP(expr). Return the population standard deviation of expr. Example:: - PPL> source=accounts | eventstats stddev_pop(age); + os> source=accounts | fields account_number, gender, age | eventstats stddev_pop(age) | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | stddev_pop(age) | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------| - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 2.8613807855648994 | - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 2.8613807855648994 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 2.8613807855648994 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 2.8613807855648994 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+ + +----------------+--------+-----+--------------------+ + | account_number | gender | age | stddev_pop(age) | + |----------------+--------+-----+--------------------| + | 1 | M | 32 | 2.8613807855648994 | + | 6 | M | 36 | 2.8613807855648994 | + | 13 | F | 28 | 2.8613807855648994 | + | 18 | M | 33 | 2.8613807855648994 | + +----------------+--------+-----+--------------------+ DISTINCT_COUNT, DC(Since 3.3) @@ -290,16 +289,111 @@ For details on algorithm accuracy and precision control, see the `OpenSearch Car Example:: - PPL> source=accounts | eventstats dc(state) as distinct_states, distinct_count(state) as dc_states_alt by gender; + os> source=accounts | fields account_number, gender, state, age | eventstats dc(state) as distinct_states, distinct_count(state) as dc_states_alt by gender | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | distinct_states | dc_states_alt | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------|-----------------| - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 1 | 1 | - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 3 | 3 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 3 | 3 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 3 | 3 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----------------+-----------------+ + +----------------+--------+-------+-----+-----------------+---------------+ + | account_number | gender | state | age | distinct_states | dc_states_alt | + |----------------+--------+-------+-----+-----------------+---------------| + | 1 | M | IL | 32 | 3 | 3 | + | 6 | M | TN | 36 | 3 | 3 | + | 13 | F | VA | 28 | 1 | 1 | + | 18 | M | MD | 33 | 3 | 3 | + +----------------+--------+-------+-----+-----------------+---------------+ + +EARLIEST (Since 3.3) +--------------------- + +Description +>>>>>>>>>>> + +Usage: EARLIEST(field [, time_field]). Return the earliest value of a field based on timestamp ordering. This function enriches each event with the earliest value found within the specified grouping. + +* field: mandatory. The field to return the earliest value for. +* time_field: optional. The field to use for time-based ordering. Defaults to @timestamp if not specified. + +Note: This function requires Calcite to be enabled (see `Configuration`_ section above). + +Example:: + + os> source=events | fields @timestamp, host, message | eventstats earliest(message) by host | sort @timestamp; + fetched rows / total rows = 8/8 + +---------------------+---------+----------------------+-------------------+ + | @timestamp | host | message | earliest(message) | + |---------------------+---------+----------------------+-------------------| + | 2023-01-01 10:00:00 | server1 | Starting up | Starting up | + | 2023-01-01 10:05:00 | server2 | Initializing | Initializing | + | 2023-01-01 10:10:00 | server1 | Ready to serve | Starting up | + | 2023-01-01 10:15:00 | server2 | Ready | Initializing | + | 2023-01-01 10:20:00 | server1 | Processing requests | Starting up | + | 2023-01-01 10:25:00 | server2 | Handling connections | Initializing | + | 2023-01-01 10:30:00 | server1 | Shutting down | Starting up | + | 2023-01-01 10:35:00 | server2 | Maintenance mode | Initializing | + +---------------------+---------+----------------------+-------------------+ + +Example with custom time field:: + + os> source=events | fields event_time, status, category | eventstats earliest(status, event_time) by category | sort event_time; + fetched rows / total rows = 8/8 + +---------------------+------------+----------+------------------------------+ + | event_time | status | category | earliest(status, event_time) | + |---------------------+------------+----------+------------------------------| + | 2023-01-01 09:55:00 | pending | orders | pending | + | 2023-01-01 10:00:00 | active | users | active | + | 2023-01-01 10:05:00 | processing | orders | pending | + | 2023-01-01 10:10:00 | inactive | users | active | + | 2023-01-01 10:15:00 | completed | orders | pending | + | 2023-01-01 10:20:00 | pending | users | active | + | 2023-01-01 10:25:00 | cancelled | orders | pending | + | 2023-01-01 10:30:00 | inactive | users | active | + +---------------------+------------+----------+------------------------------+ + + +LATEST (Since 3.3) +------------------- + +Description +>>>>>>>>>>> + +Usage: LATEST(field [, time_field]). Return the latest value of a field based on timestamp ordering. This function enriches each event with the latest value found within the specified grouping. + +* field: mandatory. The field to return the latest value for. +* time_field: optional. The field to use for time-based ordering. Defaults to @timestamp if not specified. + +Note: This function requires Calcite to be enabled (see `Configuration`_ section above). + +Example:: + + os> source=events | fields @timestamp, host, message | eventstats latest(message) by host | sort @timestamp; + fetched rows / total rows = 8/8 + +---------------------+---------+----------------------+------------------+ + | @timestamp | host | message | latest(message) | + |---------------------+---------+----------------------+------------------| + | 2023-01-01 10:00:00 | server1 | Starting up | Shutting down | + | 2023-01-01 10:05:00 | server2 | Initializing | Maintenance mode | + | 2023-01-01 10:10:00 | server1 | Ready to serve | Shutting down | + | 2023-01-01 10:15:00 | server2 | Ready | Maintenance mode | + | 2023-01-01 10:20:00 | server1 | Processing requests | Shutting down | + | 2023-01-01 10:25:00 | server2 | Handling connections | Maintenance mode | + | 2023-01-01 10:30:00 | server1 | Shutting down | Shutting down | + | 2023-01-01 10:35:00 | server2 | Maintenance mode | Maintenance mode | + +---------------------+---------+----------------------+------------------+ + +Example with custom time field:: + + os> source=events | fields event_time, status message, category | eventstats latest(status, event_time) by category | sort event_time; + fetched rows / total rows = 8/8 + +---------------------+------------+----------------------+----------+----------------------------+ + | event_time | status | message | category | latest(status, event_time) | + |---------------------+------------+----------------------+----------+----------------------------| + | 2023-01-01 09:55:00 | pending | Starting up | orders | cancelled | + | 2023-01-01 10:00:00 | active | Initializing | users | inactive | + | 2023-01-01 10:05:00 | processing | Ready to serve | orders | cancelled | + | 2023-01-01 10:10:00 | inactive | Ready | users | inactive | + | 2023-01-01 10:15:00 | completed | Processing requests | orders | cancelled | + | 2023-01-01 10:20:00 | pending | Handling connections | users | inactive | + | 2023-01-01 10:25:00 | cancelled | Shutting down | orders | cancelled | + | 2023-01-01 10:30:00 | inactive | Maintenance mode | users | inactive | + +---------------------+------------+----------------------+----------+----------------------------+ Configuration @@ -348,17 +442,16 @@ The example show calculate the average age, sum age and count of events of all t PPL query:: - PPL> source=accounts | eventstats avg(age), sum(age), count() by gender; + os> source=accounts | fields account_number, gender, age | eventstats avg(age), sum(age), count() by gender | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+----------+---------+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | avg(age) | sum(age) | count() | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+----------+---------| - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 28.0 | 28 | 1 | - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 33.666666666666664 | 101 | 3 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 33.666666666666664 | 101 | 3 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 33.666666666666664 | 101 | 3 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+--------------------+----------+---------+ - + +----------------+--------+-----+--------------------+----------+---------+ + | account_number | gender | age | avg(age) | sum(age) | count() | + |----------------+--------+-----+--------------------+----------+---------| + | 1 | M | 32 | 33.666666666666664 | 101 | 3 | + | 6 | M | 36 | 33.666666666666664 | 101 | 3 | + | 13 | F | 28 | 28.0 | 28 | 1 | + | 18 | M | 33 | 33.666666666666664 | 101 | 3 | + +----------------+--------+-----+--------------------+----------+---------+ Example 2: Calculate the count by a gender and span =================================================== @@ -367,14 +460,13 @@ The example gets the count of age by the interval of 10 years and group by gende PPL query:: - PPL> source=accounts | eventstats count() as cnt by span(age, 5) as age_span, gender + os> source=accounts | fields account_number, gender, age | eventstats count() as cnt by span(age, 5) as age_span, gender | sort account_number; fetched rows / total rows = 4/4 - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----+ - | account_number | firstname | address | balance | gender | city | employer | state | age | email | lastname | cnt | - |----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----| - | 1 | Amber | 880 Holmes Lane | 39225 | M | Brogan | Pyrami | IL | 32 | amberduke@pyrami.com | Duke | 2 | - | 18 | Dale | 467 Hutchinson Court | 4180 | M | Orick | null | MD | 33 | daleadams@boink.com | Adams | 2 | - | 13 | Nanette | 789 Madison Street | 32838 | F | Nogal | Quility | VA | 28 | null | Bates | 1 | - | 6 | Hattie | 671 Bristol Street | 5686 | M | Dante | Netagy | TN | 36 | hattiebond@netagy.com | Bond | 1 | - +----------------+-----------+----------------------+---------+--------+--------+----------+-------+-----+-----------------------+----------+-----+ - + +----------------+--------+-----+-----+ + | account_number | gender | age | cnt | + |----------------+--------+-----+-----| + | 1 | M | 32 | 2 | + | 6 | M | 36 | 1 | + | 13 | F | 28 | 1 | + | 18 | M | 33 | 2 | + +----------------+--------+-----+-----+ diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index c0265dd2738..e5242a50d1c 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -13,7 +13,6 @@ import java.io.IOException; import java.util.Locale; -import org.junit.Assume; import org.junit.Ignore; import org.junit.Test; import org.opensearch.sql.ppl.ExplainIT; @@ -115,7 +114,7 @@ public void supportPushDownSortMergeJoin() throws IOException { @Ignore("We've supported script push down on text field") @Test public void supportPartialPushDown() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); // field `address` is text type without keyword subfield, so we cannot push it down. String query = "source=opensearch-sql_test_index_account | where (state = 'Seattle' or age < 10) and (age" @@ -129,7 +128,7 @@ public void supportPartialPushDown() throws IOException { @Ignore("We've supported script push down on text field") @Test public void supportPartialPushDown_NoPushIfAllFailed() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); // field `address` is text type without keyword subfield, so we cannot push it down. String query = "source=opensearch-sql_test_index_account | where (address = '671 Bristol Street' or age <" @@ -187,7 +186,7 @@ public void testExplainIsNullOrOthers() throws IOException { @Ignore("We've supported script push down on text field") @Test public void supportPartialPushDownScript() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); // field `address` is text type without keyword subfield, so we cannot push it down. // But the second condition can be translated to script, so the second one is pushed down. String query = @@ -215,7 +214,7 @@ public void testPartialPushdownFilterWithIsNull() throws IOException { @Test public void testSkipScriptEncodingOnExtendedFormat() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); String query = "source=opensearch-sql_test_index_account | where address = '671 Bristol Street' and age -" + " 2 = 30 | fields firstname, age, address"; @@ -272,7 +271,7 @@ public void testExplainWithTimechartCount() throws IOException { @Test public void noPushDownForAggOnWindow() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); String query = "source=opensearch-sql_test_index_account | patterns address method=BRAIN | stats count()" + " by patterns_field"; @@ -284,7 +283,7 @@ public void noPushDownForAggOnWindow() throws IOException { // Only for Calcite @Test public void supportPushDownScriptOnTextField() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); String result = explainQueryToString( "explain source=opensearch-sql_test_index_account | where length(address) > 0 | eval" @@ -357,8 +356,9 @@ public void testExplainCountEvalComplex() throws IOException { assertJsonEqualsIgnoreId(expected, result); } + @Test public void testEventstatsDistinctCountExplain() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); String query = "source=opensearch-sql_test_index_account | eventstats dc(state) as distinct_states"; var result = explainQueryToString(query); @@ -368,7 +368,7 @@ public void testEventstatsDistinctCountExplain() throws IOException { @Test public void testEventstatsDistinctCountFunctionExplain() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); String query = "source=opensearch-sql_test_index_account | eventstats distinct_count(state) as" + " distinct_states by gender"; @@ -392,7 +392,7 @@ public void testExplainOnAggregationWithSumEnhancement() throws IOException { @Test public void testExplainRegexMatchInWhereWithScriptPushdown() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); String query = String.format("source=%s | where regex_match(name, 'hello')", TEST_INDEX_STRINGS); var result = explainQueryToString(query); @@ -402,7 +402,7 @@ public void testExplainRegexMatchInWhereWithScriptPushdown() throws IOException @Test public void testExplainRegexMatchInEvalWithOutScriptPushdown() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); String query = String.format( "source=%s |eval has_hello = regex_match(name, 'hello') | fields has_hello", @@ -451,6 +451,44 @@ public void testExplainOnFirstLast() throws IOException { TEST_INDEX_BANK))); } + // Only for Calcite + public void testExplainOnEventstatsEarliestLatest() throws IOException { + String expected = loadExpectedPlan("explain_eventstats_earliest_latest.json"); + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + "source=%s | eventstats earliest(message) as earliest_message, latest(message) as" + + " latest_message by server", + TEST_INDEX_LOGS))); + } + + // Only for Calcite + @Test + public void testExplainOnEventstatsEarliestLatestWithCustomTimeField() throws IOException { + String expected = loadExpectedPlan("explain_eventstats_earliest_latest_custom_time.json"); + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + "source=%s | eventstats earliest(message, created_at) as earliest_message," + + " latest(message, created_at) as latest_message by level", + TEST_INDEX_LOGS))); + } + + // Only for Calcite + @Test + public void testExplainOnEventstatsEarliestLatestNoGroupBy() throws IOException { + String expected = loadExpectedPlan("explain_eventstats_earliest_latest_no_group.json"); + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + "source=%s | eventstats earliest(message) as earliest_message, latest(message) as" + + " latest_message", + TEST_INDEX_LOGS))); + } + @Test public void testListAggregationExplain() throws IOException { String expected = loadExpectedPlan("explain_list_aggregation.json"); @@ -521,7 +559,7 @@ public void testExplainAppendCommand() throws IOException { @Test public void testPushdownLimitIntoAggregation() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); String expected = loadExpectedPlan("explain_limit_agg_pushdown.json"); assertJsonEqualsIgnoreId( expected, diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteLikeQueryIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteLikeQueryIT.java index a0bc25b8caa..3a3b1bb0648 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteLikeQueryIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteLikeQueryIT.java @@ -6,7 +6,6 @@ package org.opensearch.sql.calcite.remote; import java.io.IOException; -import org.junit.Assume; import org.junit.Test; import org.opensearch.sql.ppl.LikeQueryIT; @@ -20,7 +19,7 @@ public void init() throws Exception { @Override @Test public void test_convert_field_text_to_keyword() throws IOException { - Assume.assumeTrue("Pushdown is not enabled, skipping this test.", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); super.test_convert_field_text_to_keyword(); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLEventstatsIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLEventstatsIT.java index 15ad300ae6e..59c23a0eeed 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLEventstatsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLEventstatsIT.java @@ -11,7 +11,6 @@ import java.io.IOException; import java.util.List; import org.json.JSONObject; -import org.junit.Ignore; import org.junit.jupiter.api.Test; import org.opensearch.sql.ppl.PPLIntegTestCase; @@ -21,9 +20,10 @@ public void init() throws Exception { super.init(); enableCalcite(); + loadIndex(Index.BANK); loadIndex(Index.STATE_COUNTRY); loadIndex(Index.STATE_COUNTRY_WITH_NULL); - loadIndex(Index.BANK_TWO); + loadIndex(Index.LOGS); } @Test @@ -680,48 +680,63 @@ public void testEventstatDistinctCountWithNull() throws IOException { rows("Hello", "USA", "New York", 4, 2023, 30, 4)); } - @Ignore @Test public void testEventstatEarliestAndLatest() throws IOException { JSONObject actual = executeQuery( String.format( - "source=%s | eventstats earliest(birthdate), latest(birthdate) | head 1", - TEST_INDEX_BANK_TWO)); + "source=%s | eventstats earliest(message), latest(message) by server", + TEST_INDEX_LOGS)); verifySchema( actual, - schema("account_number", "bigint"), - schema("firstname", "string"), - schema("address", "string"), - schema("birthdate", "timestamp"), - schema("gender", "string"), - schema("city", "string"), - schema("lastname", "string"), - schema("balance", "bigint"), - schema("employer", "string"), - schema("state", "string"), - schema("age", "int"), - schema("email", "string"), - schema("male", "boolean"), - schema("earliest(birthdate)", "timestamp"), - schema("latest(birthdate)", "timestamp")); + schema("created_at", "timestamp"), + schema("server", "string"), + schema("@timestamp", "timestamp"), + schema("message", "string"), + schema("level", "string"), + schema("earliest(message)", "string"), + schema("latest(message)", "string")); verifyDataRows( actual, rows( - 1, - "Amber JOHnny", - "880 Holmes Lane", - "2017-10-23 00:00:00", - "M", - "Brogan", - "Duke Willmington", - 39225, - "Pyrami", - "IL", - 32, - "amberduke@pyrami.com", - true, - "1970-01-18 20:22:32", - "2018-08-19 00:00:00")); + "2023-01-05 00:00:00", + "server1", + "2023-01-01 00:00:00", + "Database connection failed", + "ERROR", + "Database connection failed", + "High memory usage"), + rows( + "2023-01-04 00:00:00", + "server2", + "2023-01-02 00:00:00", + "Service started", + "INFO", + "Service started", + "Backup completed"), + rows( + "2023-01-03 00:00:00", + "server1", + "2023-01-03 00:00:00", + "High memory usage", + "WARN", + "Database connection failed", + "High memory usage"), + rows( + "2023-01-02 00:00:00", + "server3", + "2023-01-04 00:00:00", + "Disk space low", + "ERROR", + "Disk space low", + "Disk space low"), + rows( + "2023-01-01 00:00:00", + "server2", + "2023-01-05 00:00:00", + "Backup completed", + "INFO", + "Service started", + "Backup completed")); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteRelevanceFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteRelevanceFunctionIT.java index 03fca9bc615..84a5d69d9b6 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteRelevanceFunctionIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteRelevanceFunctionIT.java @@ -8,7 +8,6 @@ import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BEER; import java.io.IOException; -import org.junit.Assume; import org.opensearch.sql.ppl.RelevanceFunctionIT; public class CalciteRelevanceFunctionIT extends RelevanceFunctionIT { @@ -22,7 +21,7 @@ public void init() throws Exception { // optimization rule `FilterProjectTransposeRule` to push down the filter through the project. @Override public void not_pushdown_throws_exception() throws IOException { - Assume.assumeTrue("This test is only for push down enabled", isPushdownEnabled()); + enabledOnlyWhenPushdownIsEnabled(); String query1 = "SOURCE=" + TEST_INDEX_BEER diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java index 4e143154fe7..6c9e4bcd9a6 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java @@ -500,10 +500,7 @@ public void testDedupKeepEmptyFalsePushdown() throws IOException { @Test public void testSingleFieldRelevanceQueryFunctionExplain() throws IOException { - // This test is only applicable if pushdown is enabled - if (!isPushdownEnabled()) { - return; - } + enabledOnlyWhenPushdownIsEnabled(); String expected = isCalciteEnabled() @@ -519,10 +516,7 @@ public void testSingleFieldRelevanceQueryFunctionExplain() throws IOException { @Test public void testMultiFieldsRelevanceQueryFunctionExplain() throws IOException { - // This test is only applicable if pushdown is enabled - if (!isPushdownEnabled()) { - return; - } + enabledOnlyWhenPushdownIsEnabled(); String expected = isCalciteEnabled() diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java index 4800bad4e06..f50d114c64d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PPLIntegTestCase.java @@ -20,6 +20,7 @@ import org.json.JSONException; import org.json.JSONObject; import org.junit.Assert; +import org.junit.Assume; import org.junit.Rule; import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; @@ -83,6 +84,22 @@ protected String executeCsvQuery(String query) throws IOException { return executeCsvQuery(query, true); } + protected void verifyExplainException(String query, String expectedErrorMessage) { + ResponseException e = assertThrows(ResponseException.class, () -> explainQueryToString(query)); + try { + String responseBody = getResponseBody(e.getResponse(), true); + JSONObject errorResponse = new JSONObject(responseBody); + String actualErrorMessage = errorResponse.getJSONObject("error").getString("details"); + assertEquals(expectedErrorMessage, actualErrorMessage); + } catch (IOException | JSONException ex) { + throw new RuntimeException("Failed to parse error response", ex); + } + } + + protected static String source(String index, String query) { + return String.format("source=%s | %s", index, query); + } + protected void timing(MapBuilder builder, String query, String ppl) throws IOException { executeQuery(ppl); // warm-up @@ -284,6 +301,10 @@ public boolean isPushdownEnabled() throws IOException { getClusterSetting(Settings.Key.CALCITE_PUSHDOWN_ENABLED.getKeyValue(), "transient")); } + protected void enabledOnlyWhenPushdownIsEnabled() throws IOException { + Assume.assumeTrue("This test is only for when push down is enabled", isPushdownEnabled()); + } + public void updatePushdownSettings() throws IOException { String pushdownEnabled = String.valueOf(GlobalPushdownConfig.enabled); assert !pushdownEnabled.isBlank() : "Pushdown enabled setting cannot be empty"; diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_avg.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_avg.json new file mode 100644 index 00000000000..7e8dc10e046 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_avg.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", + "physical":"EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_earliest_latest.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_earliest_latest.json new file mode 100644 index 00000000000..60030435c34 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_earliest_latest.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[ARG_MIN($3, $2) OVER (PARTITION BY $1)], latest_message=[ARG_MAX($3, $2) OVER (PARTITION BY $1)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(partition {1} aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"created_at\",\"server\",\"@timestamp\",\"message\",\"level\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_earliest_latest_custom_time.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_earliest_latest_custom_time.json new file mode 100644 index 00000000000..53d9934df12 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_earliest_latest_custom_time.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[ARG_MIN($3, $0) OVER (PARTITION BY $4)], latest_message=[ARG_MAX($3, $0) OVER (PARTITION BY $4)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(partition {4} aggs [ARG_MIN($3, $0), ARG_MAX($3, $0)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"created_at\",\"server\",\"@timestamp\",\"message\",\"level\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_earliest_latest_no_group.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_earliest_latest_no_group.json new file mode 100644 index 00000000000..5524ad54abf --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_eventstats_earliest_latest_no_group.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[ARG_MIN($3, $2) OVER ()], latest_message=[ARG_MAX($3, $2) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]], PushDownContext=[[PROJECT->[created_at, server, @timestamp, message, level]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"created_at\",\"server\",\"@timestamp\",\"message\",\"level\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_avg.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_avg.json new file mode 100644 index 00000000000..119aaaf8f55 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_avg.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], avg_balance=[/(SUM($3) OVER (), CAST(COUNT($3) OVER ()):DOUBLE NOT NULL)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n", + "physical":"EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[CAST($t18):DOUBLE NOT NULL], expr#20=[/($t17, $t19)], proj#0..10=[{exprs}], avg_balance=[$t20])\n EnumerableWindow(window#0=[window(aggs [$SUM0($3), COUNT($3)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_earliest_latest.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_earliest_latest.json new file mode 100644 index 00000000000..e2ea6b3ddb5 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_earliest_latest.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[ARG_MIN($3, $2) OVER (PARTITION BY $1)], latest_message=[ARG_MAX($3, $2) OVER (PARTITION BY $1)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..12=[{inputs}], proj#0..4=[{exprs}], $5=[$t11], $6=[$t12])\n EnumerableWindow(window#0=[window(partition {1} aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_earliest_latest_custom_time.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_earliest_latest_custom_time.json new file mode 100644 index 00000000000..27849cce681 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_earliest_latest_custom_time.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[ARG_MIN($3, $0) OVER (PARTITION BY $4)], latest_message=[ARG_MAX($3, $0) OVER (PARTITION BY $4)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..12=[{inputs}], proj#0..4=[{exprs}], $5=[$t11], $6=[$t12])\n EnumerableWindow(window#0=[window(partition {4} aggs [ARG_MIN($3, $0), ARG_MAX($3, $0)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n" + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_earliest_latest_no_group.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_earliest_latest_no_group.json new file mode 100644 index 00000000000..034699c80e9 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_eventstats_earliest_latest_no_group.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(created_at=[$0], server=[$1], @timestamp=[$2], message=[$3], level=[$4], earliest_message=[ARG_MIN($3, $2) OVER ()], latest_message=[ARG_MAX($3, $2) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n", + "physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..12=[{inputs}], proj#0..4=[{exprs}], $5=[$t11], $6=[$t12])\n EnumerableWindow(window#0=[window(aggs [ARG_MIN($3, $2), ARG_MAX($3, $2)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_logs]])\n" + } +} diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 6b3487f3d97..8d13821d297 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -573,14 +573,13 @@ statsAggTerm // aggregation functions statsFunction - : statsFunctionName LT_PRTHS valueExpression RT_PRTHS # statsFunctionCall - | (COUNT | C) LT_PRTHS evalExpression RT_PRTHS # countEvalFunctionCall + : (COUNT | C) LT_PRTHS evalExpression RT_PRTHS # countEvalFunctionCall | (COUNT | C) (LT_PRTHS RT_PRTHS)? # countAllFunctionCall | PERCENTILE_SHORTCUT LT_PRTHS valueExpression RT_PRTHS # percentileShortcutFunctionCall | (DISTINCT_COUNT | DC | DISTINCT_COUNT_APPROX) LT_PRTHS valueExpression RT_PRTHS # distinctCountFunctionCall | takeAggFunction # takeAggFunctionCall | percentileApproxFunction # percentileApproxFunctionCall - | earliestLatestFunction # earliestLatestFunctionCall + | statsFunctionName LT_PRTHS functionArgs RT_PRTHS # statsFunctionCall ; statsFunctionName @@ -598,16 +597,11 @@ statsFunctionName | MEDIAN | LIST | FIRST + | EARLIEST + | LATEST | LAST ; -earliestLatestFunction - : (EARLIEST | LATEST) LT_PRTHS valueExpression (COMMA timeField = valueExpression)? RT_PRTHS - ; - - - - takeAggFunction : TAKE LT_PRTHS fieldExpression (COMMA size = integerLiteral)? RT_PRTHS ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 6451ea258ca..c055a95d55b 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -267,7 +267,16 @@ public UnresolvedExpression visitSortField(SortFieldContext ctx) { /** Aggregation function. */ @Override public UnresolvedExpression visitStatsFunctionCall(StatsFunctionCallContext ctx) { - return new AggregateFunction(ctx.statsFunctionName().getText(), visit(ctx.valueExpression())); + return buildAggregateFunction( + ctx.statsFunctionName().getText(), ctx.functionArgs().functionArg()); + } + + private AggregateFunction buildAggregateFunction( + String functionName, List args) { + List unresolvedArgs = + args.stream().map(this::visitFunctionArg).collect(Collectors.toList()); + return new AggregateFunction( + functionName, unresolvedArgs.get(0), unresolvedArgs.subList(1, unresolvedArgs.size())); } @Override @@ -345,30 +354,6 @@ public UnresolvedExpression visitPercentileShortcutFunctionCall( new UnresolvedArgument("percent", AstDSL.doubleLiteral(percent)))); } - public UnresolvedExpression visitEarliestLatestFunctionCall( - OpenSearchPPLParser.EarliestLatestFunctionCallContext ctx) { - return visit(ctx.earliestLatestFunction()); - } - - @Override - public UnresolvedExpression visitEarliestLatestFunction( - OpenSearchPPLParser.EarliestLatestFunctionContext ctx) { - String functionName = ctx.EARLIEST() != null ? "earliest" : "latest"; - UnresolvedExpression valueField = visit(ctx.valueExpression(0)); - - if (ctx.timeField != null) { - // Two parameters: earliest(field, time_field) or latest(field, time_field) - UnresolvedExpression timeField = visit(ctx.timeField); - return new AggregateFunction( - functionName, - valueField, - Collections.singletonList(new UnresolvedArgument("time_field", timeField))); - } else { - // Single parameter: earliest(field) or latest(field) - uses default @timestamp - return new AggregateFunction(functionName, valueField); - } - } - /** Case function. */ @Override public UnresolvedExpression visitCaseFunctionCall( @@ -405,6 +390,12 @@ public UnresolvedExpression visitEvalFunctionCall(EvalFunctionCallContext ctx) { return buildFunction(mappedName, ctx.functionArgs().functionArg()); } + private Function buildFunction( + String functionName, List args) { + return new Function( + functionName, args.stream().map(this::visitFunctionArg).collect(Collectors.toList())); + } + /** Cast function. */ @Override public UnresolvedExpression visitDataTypeFunctionCall(DataTypeFunctionCallContext ctx) { @@ -416,12 +407,6 @@ public UnresolvedExpression visitConvertedDataType(ConvertedDataTypeContext ctx) return AstDSL.stringLiteral(ctx.getText()); } - private Function buildFunction( - String functionName, List args) { - return new Function( - functionName, args.stream().map(this::visitFunctionArg).collect(Collectors.toList())); - } - /** * Rewrites sum(a, b, c, ...) to (a + b + c + ...) and avg(a, b, c, ...) to (a + b + c + ...) / n * Uses balanced tree construction to avoid deep recursion with large argument lists. @@ -697,6 +682,7 @@ public UnresolvedExpression visitBetween(OpenSearchPPLParser.BetweenContext ctx) public UnresolvedExpression visitWindowFunction(OpenSearchPPLParser.WindowFunctionContext ctx) { Function f = buildFunction(ctx.windowFunctionName().getText(), ctx.functionArgs().functionArg()); + // In PPL eventstats command, all window functions have the same partition and order spec. return new WindowFunction(f); } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java index 01bfafbf3e8..cfa38811829 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAbstractTest.java @@ -33,6 +33,7 @@ import org.apache.calcite.tools.RelBuilder; import org.apache.calcite.tools.RelRunners; import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; import org.junit.Before; import org.opensearch.sql.ast.Node; import org.opensearch.sql.ast.statement.Query; @@ -40,6 +41,7 @@ import org.opensearch.sql.calcite.CalciteRelNodeVisitor; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.common.setting.Settings.Key; +import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; import org.opensearch.sql.ppl.parser.AstBuilder; import org.opensearch.sql.ppl.parser.AstStatementBuilder; @@ -182,4 +184,9 @@ public void verifyErrorMessageContains(Throwable t, String msg) { String stackTrace = getStackTrace(t); assertThat(String.format("Actual stack trace was:\n%s", stackTrace), stackTrace.contains(msg)); } + + protected void verifyQueryThrowsException(String query, String expectedErrorMessage) { + Exception e = Assert.assertThrows(ExpressionEvaluationException.class, () -> getRelNode(query)); + verifyErrorMessageContains(e, expectedErrorMessage); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAggregateFunctionTypeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAggregateFunctionTypeTest.java new file mode 100644 index 00000000000..1e1109c256f --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAggregateFunctionTypeTest.java @@ -0,0 +1,181 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import org.apache.calcite.test.CalciteAssert; +import org.junit.Test; + +public class CalcitePPLAggregateFunctionTypeTest extends CalcitePPLAbstractTest { + + public CalcitePPLAggregateFunctionTypeTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testAvgWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | stats avg(ENAME) as avg_name", + "Aggregation function AVG expects field type {[INTEGER]|[DOUBLE]}, but got [STRING]"); + } + + @Test + public void testVarsampWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | stats var_samp(ENAME) as varsamp_name", + "Aggregation function VARSAMP expects field type {[INTEGER]|[DOUBLE]}, but got [STRING]"); + } + + @Test + public void testVarpopWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | stats var_pop(ENAME) as varpop_name", + "Aggregation function VARPOP expects field type {[INTEGER]|[DOUBLE]}, but got [STRING]"); + } + + @Test + public void testStddevSampWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | stats stddev_samp(ENAME) as stddev_name", + "Aggregation function STDDEV_SAMP expects field type {[INTEGER]|[DOUBLE]}, but got" + + " [STRING]"); + } + + @Test + public void testStddevPopWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | stats stddev_pop(ENAME) as stddev_name", + "Aggregation function STDDEV_POP expects field type {[INTEGER]|[DOUBLE]}, but got" + + " [STRING]"); + } + + @Test + public void testPercentileApproxWithWrongArgType() { + // First argument should be numeric + verifyQueryThrowsException( + "source=EMP | stats percentile_approx(ENAME, 50) as percentile", + "Aggregation function PERCENTILE_APPROX expects field type and additional arguments" + + " {[INTEGER,INTEGER]|[INTEGER,DOUBLE]|[DOUBLE,INTEGER]|[DOUBLE,DOUBLE]|[INTEGER,INTEGER,INTEGER]|[INTEGER,INTEGER,DOUBLE]|[INTEGER,DOUBLE,INTEGER]|[INTEGER,DOUBLE,DOUBLE]|[DOUBLE,INTEGER,INTEGER]|[DOUBLE,INTEGER,DOUBLE]|[DOUBLE,DOUBLE,INTEGER]|[DOUBLE,DOUBLE,DOUBLE]}," + + " but got [STRING,INTEGER]"); + } + + @Test + public void testListFunctionWithArrayArgType() { + // Test LIST function with array expression (which is not a supported scalar type) + verifyQueryThrowsException( + "source=EMP | stats list(array(ENAME, JOB)) as name_list", + "Aggregation function LIST expects field type" + + " {[BYTE]|[SHORT]|[INTEGER]|[LONG]|[FLOAT]|[DOUBLE]|[STRING]|[BOOLEAN]|[DATE]|[TIME]|[TIMESTAMP]|[IP]|[BINARY]}," + + " but got [ARRAY]"); + } + + @Test + public void testCountWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats count(EMPNO, DEPTNO)", + "Aggregation function COUNT expects field type and additional arguments {[]|[ANY]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testAvgWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats avg(EMPNO, DEPTNO)", + "Aggregation function AVG expects field type and additional arguments {[INTEGER]|[DOUBLE]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testSumWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats sum(EMPNO, DEPTNO)", + "Aggregation function SUM expects field type and additional arguments {[INTEGER]|[DOUBLE]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testMinWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats min(EMPNO, DEPTNO)", + "Aggregation function MIN expects field type and additional arguments {[COMPARABLE_TYPE]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testMaxWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats max(EMPNO, DEPTNO)", + "Aggregation function MAX expects field type and additional arguments {[COMPARABLE_TYPE]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testVarSampWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats var_samp(EMPNO, DEPTNO)", + "Aggregation function VARSAMP expects field type and additional arguments" + + " {[INTEGER]|[DOUBLE]}, but got [SHORT,BYTE]"); + } + + @Test + public void testVarPopWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats var_pop(EMPNO, DEPTNO)", + "Aggregation function VARPOP expects field type and additional arguments" + + " {[INTEGER]|[DOUBLE]}, but got [SHORT,BYTE]"); + } + + @Test + public void testStddevSampWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats stddev_samp(EMPNO, DEPTNO)", + "Aggregation function STDDEV_SAMP expects field type and additional arguments" + + " {[INTEGER]|[DOUBLE]}, but got [SHORT,BYTE]"); + } + + @Test + public void testStddevPopWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats stddev_pop(EMPNO, DEPTNO)", + "Aggregation function STDDEV_POP expects field type and additional arguments" + + " {[INTEGER]|[DOUBLE]}, but got [SHORT,BYTE]"); + } + + @Test + public void testPercentileWithMissingParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats percentile(EMPNO)", + "Aggregation function PERCENTILE_APPROX expects field type" + + " {[INTEGER,INTEGER]|[INTEGER,DOUBLE]|[DOUBLE,INTEGER]|[DOUBLE,DOUBLE]|[INTEGER,INTEGER,INTEGER]|[INTEGER,INTEGER,DOUBLE]|[INTEGER,DOUBLE,INTEGER]|[INTEGER,DOUBLE,DOUBLE]|[DOUBLE,INTEGER,INTEGER]|[DOUBLE,INTEGER,DOUBLE]|[DOUBLE,DOUBLE,INTEGER]|[DOUBLE,DOUBLE,DOUBLE]}," + + " but got [SHORT]"); + } + + @Test + public void testPercentileWithInvalidParameterTypesThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats percentile(EMPNO, 50, ENAME)", + "Aggregation function PERCENTILE_APPROX expects field type and additional arguments" + + " {[INTEGER,INTEGER]|[INTEGER,DOUBLE]|[DOUBLE,INTEGER]|[DOUBLE,DOUBLE]|[INTEGER,INTEGER,INTEGER]|[INTEGER,INTEGER,DOUBLE]|[INTEGER,DOUBLE,INTEGER]|[INTEGER,DOUBLE,DOUBLE]|[DOUBLE,INTEGER,INTEGER]|[DOUBLE,INTEGER,DOUBLE]|[DOUBLE,DOUBLE,INTEGER]|[DOUBLE,DOUBLE,DOUBLE]}," + + " but got [SHORT,INTEGER,STRING]"); + } + + @Test + public void testEarliestWithTooManyParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats earliest(ENAME, HIREDATE, JOB)", + "Aggregation function EARLIEST expects field type and additional arguments" + + " {[ANY]|[ANY,ANY]}, but got" + + " [STRING,DATE,STRING]"); + } + + @Test + public void testLatestWithTooManyParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | stats latest(ENAME, HIREDATE, JOB)", + "Aggregation function LATEST expects field type and additional arguments" + + " {[ANY]|[ANY,ANY]}, but got" + + " [STRING,DATE,STRING]"); + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTestUtils.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTestUtils.java new file mode 100644 index 00000000000..90b4376379b --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTestUtils.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableList; +import java.sql.Date; +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.type.SqlTypeName; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Shared test utilities for earliest/latest function tests. */ +public class CalcitePPLEarliestLatestTestUtils { + + /** + * Creates test data for LOGS table with @timestamp and created_at fields. Note: @timestamp and + * created_at have different orderings to test explicit field usage. + */ + public static ImmutableList createLogsTestData() { + return ImmutableList.of( + new Object[] { + "server1", + "ERROR", + "Database connection failed", + Date.valueOf("2023-01-01"), + Date.valueOf("2023-01-05") + }, + new Object[] { + "server2", + "INFO", + "Service started", + Date.valueOf("2023-01-02"), + Date.valueOf("2023-01-04") + }, + new Object[] { + "server1", + "WARN", + "High memory usage", + Date.valueOf("2023-01-03"), + Date.valueOf("2023-01-03") + }, + new Object[] { + "server3", + "ERROR", + "Disk space low", + Date.valueOf("2023-01-04"), + Date.valueOf("2023-01-02") + }, + new Object[] { + "server2", + "INFO", + "Backup completed", + Date.valueOf("2023-01-05"), + Date.valueOf("2023-01-01") + }); + } + + /** Custom table implementation with @timestamp field for earliest/latest testing. */ + @RequiredArgsConstructor + public static class LogsTable implements ScannableTable { + private final ImmutableList rows; + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("server", SqlTypeName.VARCHAR) + .add("level", SqlTypeName.VARCHAR) + .add("message", SqlTypeName.VARCHAR) + .add("@timestamp", SqlTypeName.DATE) + .add("created_at", SqlTypeName.DATE) + .build(); + + @Override + public Enumerable<@Nullable Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsEarliestLatestTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsEarliestLatestTest.java new file mode 100644 index 00000000000..d91a8638cb2 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsEarliestLatestTest.java @@ -0,0 +1,191 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import java.util.List; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.junit.Test; + +public class CalcitePPLEventstatsEarliestLatestTest extends CalcitePPLAbstractTest { + public CalcitePPLEventstatsEarliestLatestTest() { + super(CalciteAssert.SchemaSpec.POST); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + + schema.add( + "LOGS", + new CalcitePPLEarliestLatestTestUtils.LogsTable( + CalcitePPLEarliestLatestTestUtils.createLogsTestData())); + + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @Test + public void testEventstatsEarliestWithoutSecondArgument() { + String ppl = "source=LOGS | eventstats earliest(message) as earliest_message"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(server=[$0], level=[$1], message=[$2], @timestamp=[$3], created_at=[$4]," + + " earliest_message=[ARG_MIN($2, $3) OVER ()])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `server`, `level`, `message`, `@timestamp`, `created_at`, MIN_BY (`message`," + + " `@timestamp`) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)" + + " `earliest_message`\n" + + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEventstatsLatestWithoutSecondArgument() { + String ppl = "source=LOGS | eventstats latest(message) as latest_message"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(server=[$0], level=[$1], message=[$2], @timestamp=[$3], created_at=[$4]," + + " latest_message=[ARG_MAX($2, $3) OVER ()])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `server`, `level`, `message`, `@timestamp`, `created_at`, MAX_BY (`message`," + + " `@timestamp`) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)" + + " `latest_message`\n" + + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEventstatsEarliestByServerWithoutSecondArgument() { + String ppl = "source=LOGS | eventstats earliest(message) as earliest_message by server"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(server=[$0], level=[$1], message=[$2], @timestamp=[$3], created_at=[$4]," + + " earliest_message=[ARG_MIN($2, $3) OVER (PARTITION BY $0)])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `server`, `level`, `message`, `@timestamp`, `created_at`, MIN_BY (`message`," + + " `@timestamp`) OVER (PARTITION BY `server` RANGE BETWEEN UNBOUNDED PRECEDING AND" + + " UNBOUNDED FOLLOWING) `earliest_message`\n" + + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEventstatsLatestByServerWithoutSecondArgument() { + String ppl = "source=LOGS | eventstats latest(message) as latest_message by server"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(server=[$0], level=[$1], message=[$2], @timestamp=[$3], created_at=[$4]," + + " latest_message=[ARG_MAX($2, $3) OVER (PARTITION BY $0)])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `server`, `level`, `message`, `@timestamp`, `created_at`, MAX_BY (`message`," + + " `@timestamp`) OVER (PARTITION BY `server` RANGE BETWEEN UNBOUNDED PRECEDING AND" + + " UNBOUNDED FOLLOWING) `latest_message`\n" + + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEventstatsEarliestWithOtherAggregatesWithoutSecondArgument() { + String ppl = + "source=LOGS | eventstats earliest(message) as earliest_message, count() as cnt by server"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(server=[$0], level=[$1], message=[$2], @timestamp=[$3], created_at=[$4]," + + " earliest_message=[ARG_MIN($2, $3) OVER (PARTITION BY $0)], cnt=[COUNT() OVER" + + " (PARTITION BY $0)])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `server`, `level`, `message`, `@timestamp`, `created_at`, MIN_BY (`message`," + + " `@timestamp`) OVER (PARTITION BY `server` RANGE BETWEEN UNBOUNDED PRECEDING AND" + + " UNBOUNDED FOLLOWING) `earliest_message`, COUNT(*) OVER (PARTITION BY `server` RANGE" + + " BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) `cnt`\n" + + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEventstatsEarliestWithExplicitTimestampField() { + String ppl = "source=LOGS | eventstats earliest(message, created_at) as earliest_message"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(server=[$0], level=[$1], message=[$2], @timestamp=[$3], created_at=[$4]," + + " earliest_message=[ARG_MIN($2, $4) OVER ()])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `server`, `level`, `message`, `@timestamp`, `created_at`, MIN_BY (`message`," + + " `created_at`) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)" + + " `earliest_message`\n" + + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEventstatsLatestWithExplicitTimestampField() { + String ppl = "source=LOGS | eventstats latest(message, created_at) as latest_message"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(server=[$0], level=[$1], message=[$2], @timestamp=[$3], created_at=[$4]," + + " latest_message=[ARG_MAX($2, $4) OVER ()])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `server`, `level`, `message`, `@timestamp`, `created_at`, MAX_BY (`message`," + + " `created_at`) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)" + + " `latest_message`\n" + + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testEventstatsEarliestLatestCombined() { + String ppl = + "source=LOGS | eventstats earliest(message) as earliest_msg, latest(message) as latest_msg" + + " by server"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalProject(server=[$0], level=[$1], message=[$2], @timestamp=[$3], created_at=[$4]," + + " earliest_msg=[ARG_MIN($2, $3) OVER (PARTITION BY $0)], latest_msg=[ARG_MAX($2, $3)" + + " OVER (PARTITION BY $0)])\n" + + " LogicalTableScan(table=[[POST, LOGS]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `server`, `level`, `message`, `@timestamp`, `created_at`, MIN_BY (`message`," + + " `@timestamp`) OVER (PARTITION BY `server` RANGE BETWEEN UNBOUNDED PRECEDING AND" + + " UNBOUNDED FOLLOWING) `earliest_msg`, MAX_BY (`message`, `@timestamp`) OVER" + + " (PARTITION BY `server` RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)" + + " `latest_msg`\n" + + "FROM `POST`.`LOGS`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsTypeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsTypeTest.java new file mode 100644 index 00000000000..a6535755435 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEventstatsTypeTest.java @@ -0,0 +1,143 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import org.apache.calcite.test.CalciteAssert; +import org.junit.Test; + +public class CalcitePPLEventstatsTypeTest extends CalcitePPLAbstractTest { + + public CalcitePPLEventstatsTypeTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testCountWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats count(EMPNO, DEPTNO)", + "Aggregation function COUNT expects field type and additional arguments {[]|[ANY]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testAvgWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats avg(EMPNO, DEPTNO)", + "Aggregation function AVG expects field type and additional arguments {[INTEGER]|[DOUBLE]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testSumWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats sum(EMPNO, DEPTNO)", + "Aggregation function SUM expects field type and additional arguments {[INTEGER]|[DOUBLE]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testMinWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats min(EMPNO, DEPTNO)", + "Aggregation function MIN expects field type and additional arguments {[COMPARABLE_TYPE]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testMaxWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats max(EMPNO, DEPTNO)", + "Aggregation function MAX expects field type and additional arguments {[COMPARABLE_TYPE]}," + + " but got [SHORT,BYTE]"); + } + + @Test + public void testVarSampWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats var_samp(EMPNO, DEPTNO)", + "Aggregation function VARSAMP expects field type and additional arguments" + + " {[INTEGER]|[DOUBLE]}, but got [SHORT,BYTE]"); + } + + @Test + public void testVarPopWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats var_pop(EMPNO, DEPTNO)", + "Aggregation function VARPOP expects field type and additional arguments" + + " {[INTEGER]|[DOUBLE]}, but got [SHORT,BYTE]"); + } + + @Test + public void testStddevSampWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats stddev_samp(EMPNO, DEPTNO)", + "Aggregation function STDDEV_SAMP expects field type and additional arguments" + + " {[INTEGER]|[DOUBLE]}, but got [SHORT,BYTE]"); + } + + @Test + public void testStddevPopWithExtraParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats stddev_pop(EMPNO, DEPTNO)", + "Aggregation function STDDEV_POP expects field type and additional arguments" + + " {[INTEGER]|[DOUBLE]}, but got [SHORT,BYTE]"); + } + + @Test + public void testEarliestWithTooManyParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats earliest(ENAME, HIREDATE, JOB)", + "Aggregation function EARLIEST expects field type and additional arguments" + + " {[ANY]|[ANY,ANY]}, but got" + + " [STRING,DATE,STRING]"); + } + + @Test + public void testLatestWithTooManyParametersThrowsException() { + verifyQueryThrowsException( + "source=EMP | eventstats latest(ENAME, HIREDATE, JOB)", + "Aggregation function LATEST expects field type and additional arguments" + + " {[ANY]|[ANY,ANY]}, but got" + + " [STRING,DATE,STRING]"); + } + + @Test + public void testAvgWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | eventstats avg(ENAME) as avg_name", + "Aggregation function AVG expects field type {[INTEGER]|[DOUBLE]}, but got [STRING]"); + } + + @Test + public void testVarsampWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | eventstats var_samp(ENAME) as varsamp_name", + "Aggregation function VARSAMP expects field type {[INTEGER]|[DOUBLE]}, but got [STRING]"); + } + + @Test + public void testVarpopWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | eventstats var_pop(ENAME) as varpop_name", + "Aggregation function VARPOP expects field type {[INTEGER]|[DOUBLE]}, but got [STRING]"); + } + + @Test + public void testStddevSampWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | eventstats stddev_samp(ENAME) as stddev_name", + "Aggregation function STDDEV_SAMP expects field type {[INTEGER]|[DOUBLE]}, but got" + + " [STRING]"); + } + + @Test + public void testStddevPopWithWrongArgType() { + verifyQueryThrowsException( + "source=EMP | eventstats stddev_pop(ENAME) as stddev_name", + "Aggregation function STDDEV_POP expects field type {[INTEGER]|[DOUBLE]}, but got" + + " [STRING]"); + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java index 907abf76a89..500d6873f89 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLFunctionTypeTest.java @@ -8,7 +8,6 @@ import org.apache.calcite.test.CalciteAssert; import org.junit.Assert; import org.junit.Test; -import org.opensearch.sql.exception.ExpressionEvaluationException; public class CalcitePPLFunctionTypeTest extends CalcitePPLAbstractTest { @@ -18,9 +17,9 @@ public CalcitePPLFunctionTypeTest() { @Test public void testLowerWithIntegerType() { - String ppl = "source=EMP | eval lower_name = lower(EMPNO) | fields lower_name"; - Throwable t = Assert.assertThrows(ExpressionEvaluationException.class, () -> getRelNode(ppl)); - verifyErrorMessageContains(t, "LOWER function expects {[STRING]}, but got [SHORT]"); + verifyQueryThrowsException( + "source=EMP | eval lower_name = lower(EMPNO) | fields lower_name", + "LOWER function expects {[STRING]}, but got [SHORT]"); } @Test @@ -30,26 +29,23 @@ public void testTimeDiffWithUdtInputType() { String timePpl = "source=EMP | eval time_diff = timediff(time('13:00:00'), time('12:00:06')) | fields" + " time_diff"; - String wrongPpl = "source=EMP | eval time_diff = timediff(12, '2009-12-10') | fields time_diff"; getRelNode(strPpl); getRelNode(timePpl); - Throwable t = Assert.assertThrows(Exception.class, () -> getRelNode(wrongPpl)); - verifyErrorMessageContains( - t, "TIMEDIFF function expects {[TIME,TIME]}, but got [INTEGER,STRING]"); + verifyQueryThrowsException( + "source=EMP | eval time_diff = timediff(12, '2009-12-10') | fields time_diff", + "TIMEDIFF function expects {[TIME,TIME]}, but got [INTEGER,STRING]"); } @Test public void testComparisonWithDifferentType() { getRelNode("source=EMP | where EMPNO > 6 | fields ENAME"); getRelNode("source=EMP | where ENAME <= 'Jack' | fields ENAME"); - String ppl = "source=EMP | where ENAME < 6 | fields ENAME"; - Throwable t = Assert.assertThrows(ExpressionEvaluationException.class, () -> getRelNode(ppl)); - verifyErrorMessageContains( - t, + verifyQueryThrowsException( + "source=EMP | where ENAME < 6 | fields ENAME", // Temporary fix for the error message as LESS function has two variants. Will remove // [IP,IP] when merging the two variants. - "LESS function expects {[IP,IP],[COMPARABLE_TYPE,COMPARABLE_TYPE]}," - + " but got [STRING,INTEGER]"); + "LESS function expects {[IP,IP],[COMPARABLE_TYPE,COMPARABLE_TYPE]}, but got" + + " [STRING,INTEGER]"); } @Test @@ -70,11 +66,9 @@ public void testCoalesceWithDifferentType() { public void testSubstringWithWrongType() { getRelNode("source=EMP | eval sub_name = substring(ENAME, 1, 3) | fields sub_name"); getRelNode("source=EMP | eval sub_name = substring(ENAME, 1) | fields sub_name"); - String ppl = "source=EMP | eval sub_name = substring(ENAME, 1, '3') | fields sub_name"; - Throwable t = Assert.assertThrows(ExpressionEvaluationException.class, () -> getRelNode(ppl)); - verifyErrorMessageContains( - t, - "SUBSTRING function expects {[STRING,INTEGER],[STRING,INTEGER,INTEGER]}, but got" + verifyQueryThrowsException( + "source=EMP | eval sub_name = substring(ENAME, 1, '3') | fields sub_name", + "SUBSTRING function expects {[STRING,INTEGER]|[STRING,INTEGER,INTEGER]}, but got" + " [STRING,INTEGER,STRING]"); } @@ -82,230 +76,103 @@ public void testSubstringWithWrongType() { public void testIfWithWrongType() { getRelNode("source=EMP | eval if_name = if(EMPNO > 6, 'Jack', ENAME) | fields if_name"); getRelNode("source=EMP | eval if_name = if(EMPNO > 6, EMPNO, DEPTNO) | fields if_name"); - String pplWrongCondition = "source=EMP | eval if_name = if(EMPNO, 1, DEPTNO) | fields if_name"; - Throwable t1 = - Assert.assertThrows( - ExpressionEvaluationException.class, () -> getRelNode(pplWrongCondition)); - verifyErrorMessageContains( - t1, "IF function expects {[BOOLEAN,ANY,ANY]}, but got [SHORT,INTEGER,BYTE]"); - String pplIncompatibleType = - "source=EMP | eval if_name = if(EMPNO > 6, 'Jack', 1) | fields if_name"; - Throwable t2 = - Assert.assertThrows( - ExpressionEvaluationException.class, () -> getRelNode(pplIncompatibleType)); - verifyErrorMessageContains( - t2, + verifyQueryThrowsException( + "source=EMP | eval if_name = if(EMPNO, 1, DEPTNO) | fields if_name", + "IF function expects {[BOOLEAN,ANY,ANY]}, but got [SHORT,INTEGER,BYTE]"); + verifyQueryThrowsException( + "source=EMP | eval if_name = if(EMPNO > 6, 'Jack', 1) | fields if_name", "Cannot resolve function: IF, arguments: [BOOLEAN,STRING,INTEGER], caused by: Can't find" + " leastRestrictive type for [VARCHAR, INTEGER]"); } @Test public void testTimestampWithWrongArg() { - String ppl = + verifyQueryThrowsException( "source=EMP | eval timestamp = timestamp('2020-08-26 13:49:00', 2009) | fields timestamp |" - + " head 1"; - Throwable t = Assert.assertThrows(ExpressionEvaluationException.class, () -> getRelNode(ppl)); - verifyErrorMessageContains( - t, - "TIMESTAMP function expects {" - + "[STRING],[TIMESTAMP],[DATE],[TIME],[STRING,STRING],[TIMESTAMP,TIMESTAMP],[TIMESTAMP,DATE]," - + "[TIMESTAMP,TIME],[DATE,TIMESTAMP],[DATE,DATE],[DATE,TIME],[TIME,TIMESTAMP],[TIME,DATE]," - + "[TIME,TIME],[STRING,TIMESTAMP],[STRING,DATE],[STRING,TIME],[TIMESTAMP,STRING],[DATE,STRING],[TIME,STRING]}," + + " head 1", + "TIMESTAMP function expects" + + " {[STRING]|[TIMESTAMP]|[DATE]|[TIME]|[STRING,STRING]|[TIMESTAMP,TIMESTAMP]|[TIMESTAMP,DATE]|[TIMESTAMP,TIME]|[DATE,TIMESTAMP]|[DATE,DATE]|[DATE,TIME]|[TIME,TIMESTAMP]|[TIME,DATE]|[TIME,TIME]|[STRING,TIMESTAMP]|[STRING,DATE]|[STRING,TIME]|[TIMESTAMP,STRING]|[DATE,STRING]|[TIME,STRING]}," + " but got [STRING,INTEGER]"); } @Test public void testCurDateWithArg() { - Throwable t = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | eval curdate = CURDATE(1) | fields curdate | head 1")); - verifyErrorMessageContains(t, "CURDATE function expects {[]}, but got [INTEGER]"); + verifyQueryThrowsException( + "source=EMP | eval curdate = CURDATE(1) | fields curdate | head 1", + "CURDATE function expects {[]}, but got [INTEGER]"); } // Test directly registered UDF: register(funcname, FuncImp) @Test public void testLtrimWrongArg() { - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | where ltrim(EMPNO, DEPTNO) = 'Jim' | fields name, age")); - verifyErrorMessageContains(e, "LTRIM function expects {[STRING]}, but got [SHORT,BYTE]"); + verifyQueryThrowsException( + "source=EMP | where ltrim(EMPNO, DEPTNO) = 'Jim' | fields name, age", + "LTRIM function expects {[STRING]}, but got [SHORT,BYTE]"); } // Test udf registered via sql library operator: registerOperator(REVERSE, // SqlLibraryOperators.REVERSE); @Test public void testReverseWrongArgShouldThrow() { - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | where reverse(EMPNO) = '3202' | fields year")); - verifyErrorMessageContains(e, "REVERSE function expects {[STRING]}, but got [SHORT]"); + verifyQueryThrowsException( + "source=EMP | where reverse(EMPNO) = '3202' | fields year", + "REVERSE function expects {[STRING]}, but got [SHORT]"); } // test type checking on UDF with direct registration: register(funcname, FuncImp) @Test public void testStrCmpWrongArgShouldThrow() { - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | where strcmp(10, 'Jane') = 0 | fields name, age")); - verifyErrorMessageContains( - e, "STRCMP function expects {[STRING,STRING]}, but got [INTEGER,STRING]"); + verifyQueryThrowsException( + "source=EMP | where strcmp(10, 'Jane') = 0 | fields name, age", + "STRCMP function expects {[STRING,STRING]}, but got [INTEGER,STRING]"); } // Test registered Sql Std Operator: registerOperator(funcName, SqlStdOperatorTable.OPERATOR) @Test public void testLowerWrongArgShouldThrow() { - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | where lower(EMPNO) = 'hello' | fields name, age")); - verifyErrorMessageContains(e, "LOWER function expects {[STRING]}, but got [SHORT]"); + verifyQueryThrowsException( + "source=EMP | where lower(EMPNO) = 'hello' | fields name, age", + "LOWER function expects {[STRING]}, but got [SHORT]"); } @Test public void testSha2WrongArgShouldThrow() { - Throwable e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> - getRelNode( - "source=EMP | head 1 | eval sha256 = SHA2('hello', '256') | fields sha256")); - verifyErrorMessageContains( - e, "SHA2 function expects {[STRING,INTEGER]}, but got [STRING,STRING]"); + verifyQueryThrowsException( + "source=EMP | head 1 | eval sha256 = SHA2('hello', '256') | fields sha256", + "SHA2 function expects {[STRING,INTEGER]}, but got [STRING,STRING]"); } // Test type checking on udf with direct registration: register(SQRT, funcImp) @Test public void testSqrtWithWrongArg() { - Exception nanException = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> - getRelNode( - "source=EMP | head 1 | eval sqrt_name = sqrt(ENAME) | fields sqrt_name")); - verifyErrorMessageContains( - nanException, "SQRT function expects {[INTEGER],[DOUBLE]}, but got [STRING]"); + verifyQueryThrowsException( + "source=EMP | head 1 | eval sqrt_name = sqrt(ENAME) | fields sqrt_name", + "SQRT function expects {[INTEGER]|[DOUBLE]}, but got [STRING]"); } // Test UDF registered with PPL builtin operators: registerOperator(MOD, PPLBuiltinOperators.MOD); @Test public void testModWithWrongArg() { - Exception wrongArgException = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | eval z = mod(0.5, 1, 2) | fields z")); - verifyErrorMessageContains( - wrongArgException, + verifyQueryThrowsException( + "source=EMP | eval z = mod(0.5, 1, 2) | fields z", "MOD function expects" - + " {[INTEGER,INTEGER],[INTEGER,DOUBLE],[DOUBLE,INTEGER],[DOUBLE,DOUBLE]}, but got" + + " {[INTEGER,INTEGER]|[INTEGER,DOUBLE]|[DOUBLE,INTEGER]|[DOUBLE,DOUBLE]}, but got" + " [DOUBLE,INTEGER,INTEGER]"); } // Test UDF registered with sql std operators: registerOperator(PI, SqlStdOperatorTable.PI) @Test public void testPiWithArg() { - Exception wrongArgException = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | eval pi = pi(1) | fields pi")); - verifyErrorMessageContains(wrongArgException, "PI function expects {[]}, but got [INTEGER]"); + verifyQueryThrowsException( + "source=EMP | eval pi = pi(1) | fields pi", "PI function expects {[]}, but got [INTEGER]"); } // Test UDF registered with sql library operators: registerOperator(LOG2, // SqlLibraryOperators.LOG2) @Test public void testLog2WithWrongArgShouldThrow() { - Exception wrongArgException = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | eval log2 = log2(ENAME, JOB) | fields log2")); - verifyErrorMessageContains( - wrongArgException, "LOG2 function expects {[INTEGER],[DOUBLE]}, but got [STRING,STRING]"); - } - - @Test - public void testAvgWithWrongArgType() { - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | stats avg(ENAME) as avg_name")); - verifyErrorMessageContains( - e, "Aggregation function AVG expects field type {[INTEGER],[DOUBLE]}, but got [STRING]"); - } - - @Test - public void testVarsampWithWrongArgType() { - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | stats var_samp(ENAME) as varsamp_name")); - verifyErrorMessageContains( - e, - "Aggregation function VARSAMP expects field type {[INTEGER],[DOUBLE]}, but got [STRING]"); - } - - @Test - public void testVarpopWithWrongArgType() { - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | stats var_pop(ENAME) as varpop_name")); - verifyErrorMessageContains( - e, "Aggregation function VARPOP expects field type {[INTEGER],[DOUBLE]}, but got [STRING]"); - } - - @Test - public void testStddevSampWithWrongArgType() { - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | stats stddev_samp(ENAME) as stddev_name")); - verifyErrorMessageContains( - e, - "Aggregation function STDDEV_SAMP expects field type {[INTEGER],[DOUBLE]}, but got" - + " [STRING]"); - } - - @Test - public void testStddevPopWithWrongArgType() { - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | stats stddev_pop(ENAME) as stddev_name")); - verifyErrorMessageContains( - e, - "Aggregation function STDDEV_POP expects field type {[INTEGER],[DOUBLE]}, but got" - + " [STRING]"); - } - - @Test - public void testPercentileApproxWithWrongArgType() { - // First argument should be numeric - Exception e1 = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | stats percentile_approx(ENAME, 50) as percentile")); - verifyErrorMessageContains( - e1, - "Aggregation function PERCENTILE_APPROX expects field type and additional arguments" - + " {[INTEGER,INTEGER],[INTEGER,DOUBLE],[DOUBLE,INTEGER],[DOUBLE,DOUBLE],[INTEGER,INTEGER,INTEGER],[INTEGER,INTEGER,DOUBLE],[INTEGER,DOUBLE,INTEGER],[INTEGER,DOUBLE,DOUBLE],[DOUBLE,INTEGER,INTEGER],[DOUBLE,INTEGER,DOUBLE],[DOUBLE,DOUBLE,INTEGER],[DOUBLE,DOUBLE,DOUBLE]}," - + " but got [STRING,INTEGER]"); - } - - @Test - public void testListFunctionWithArrayArgType() { - // Test LIST function with array expression (which is not a supported scalar type) - Exception e = - Assert.assertThrows( - ExpressionEvaluationException.class, - () -> getRelNode("source=EMP | stats list(array(ENAME, JOB)) as name_list")); - verifyErrorMessageContains( - e, - "Aggregation function LIST expects field type" - + " {[BYTE],[SHORT],[INTEGER],[LONG],[FLOAT],[DOUBLE],[STRING],[BOOLEAN],[DATE],[TIME],[TIMESTAMP],[IP],[BINARY]}," - + " but got [ARRAY]"); + verifyQueryThrowsException( + "source=EMP | eval log2 = log2(ENAME, JOB) | fields log2", + "LOG2 function expects {[INTEGER]|[DOUBLE]}, but got [STRING,STRING]"); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStatsEarliestLatestTest.java similarity index 65% rename from ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java rename to ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStatsEarliestLatestTest.java index c9c260cd444..f5ee3780411 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLEarliestLatestTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLStatsEarliestLatestTest.java @@ -5,38 +5,18 @@ package org.opensearch.sql.ppl.calcite; -import com.google.common.collect.ImmutableList; -import java.sql.Date; import java.util.List; -import lombok.RequiredArgsConstructor; -import org.apache.calcite.DataContext; -import org.apache.calcite.config.CalciteConnectionConfig; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Linq4j; import org.apache.calcite.plan.RelTraitDef; -import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.schema.ScannableTable; -import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Statistic; -import org.apache.calcite.schema.Statistics; -import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.parser.SqlParser; -import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.test.CalciteAssert; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.Programs; -import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; -/** Unit tests for {@code earliest/latest} functions with @timestamp field in PPL. */ -public class CalcitePPLEarliestLatestTest extends CalcitePPLAbstractTest { - public CalcitePPLEarliestLatestTest() { +public class CalcitePPLStatsEarliestLatestTest extends CalcitePPLAbstractTest { + public CalcitePPLStatsEarliestLatestTest() { super(CalciteAssert.SchemaSpec.POST); } @@ -45,46 +25,10 @@ protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpec final SchemaPlus rootSchema = Frameworks.createRootSchema(true); final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); - // Add a test table with @timestamp and created_at fields - // Note: @timestamp and created_at have different orderings to test explicit field usage - ImmutableList rows = - ImmutableList.of( - new Object[] { - "server1", - "ERROR", - "Database connection failed", - Date.valueOf("2023-01-01"), - Date.valueOf("2023-01-05") - }, - new Object[] { - "server2", - "INFO", - "Service started", - Date.valueOf("2023-01-02"), - Date.valueOf("2023-01-04") - }, - new Object[] { - "server1", - "WARN", - "High memory usage", - Date.valueOf("2023-01-03"), - Date.valueOf("2023-01-03") - }, - new Object[] { - "server3", - "ERROR", - "Disk space low", - Date.valueOf("2023-01-04"), - Date.valueOf("2023-01-02") - }, - new Object[] { - "server2", - "INFO", - "Backup completed", - Date.valueOf("2023-01-05"), - Date.valueOf("2023-01-01") - }); - schema.add("LOGS", new LogsTable(rows)); + schema.add( + "LOGS", + new CalcitePPLEarliestLatestTestUtils.LogsTable( + CalcitePPLEarliestLatestTestUtils.createLogsTestData())); return Frameworks.newConfigBuilder() .parserConfig(SqlParser.Config.DEFAULT) @@ -238,55 +182,4 @@ public void testLatestWithExplicitTimestampField() { "SELECT MAX_BY (`message`, `created_at`) `latest_message`\n" + "FROM `POST`.`LOGS`"; verifyPPLToSparkSQL(root, expectedSparkSql); } - - // Custom table implementation with @timestamp field - @RequiredArgsConstructor - public static class LogsTable implements ScannableTable { - private final ImmutableList rows; - - protected final RelProtoDataType protoRowType = - factory -> - factory - .builder() - .add("server", SqlTypeName.VARCHAR) - .add("level", SqlTypeName.VARCHAR) - .add("message", SqlTypeName.VARCHAR) - .add("@timestamp", SqlTypeName.DATE) - .add("created_at", SqlTypeName.DATE) - .build(); - - @Override - public Enumerable<@Nullable Object[]> scan(DataContext root) { - return Linq4j.asEnumerable(rows); - } - - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return protoRowType.apply(typeFactory); - } - - @Override - public Statistic getStatistic() { - return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); - } - - @Override - public Schema.TableType getJdbcTableType() { - return Schema.TableType.TABLE; - } - - @Override - public boolean isRolledUp(String column) { - return false; - } - - @Override - public boolean rolledUpColumnValidInsideAgg( - String column, - SqlCall call, - @Nullable SqlNode parent, - @Nullable CalciteConnectionConfig config) { - return false; - } - } }