-
Notifications
You must be signed in to change notification settings - Fork 180
Add earliest/latest aggregate function for eventstats PPL command #4212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
17d25ec
673f0c3
facbfcf
6eb31d3
ed77e18
6dd3a76
fbe2fa7
abd9586
9d3e9c6
2b8bf6b
35dd27c
892c42f
b2eba67
a2d2295
a5a0481
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -231,7 +231,6 @@ | |
| import java.util.HashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import java.util.Optional; | ||
| import java.util.StringJoiner; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
|
|
@@ -261,7 +260,6 @@ | |
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.opensearch.sql.calcite.CalcitePlanContext; | ||
| import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; | ||
| import org.opensearch.sql.calcite.utils.PPLOperandTypes; | ||
| import org.opensearch.sql.calcite.utils.PlanUtils; | ||
| import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; | ||
|
|
@@ -407,25 +405,40 @@ public void registerExternalAggOperator( | |
| aggExternalFunctionRegistry.put(functionName, Pair.of(signature, handler)); | ||
| } | ||
|
|
||
| public void validateAggFunctionSignature( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: Consider using an From a safety standpoint, it would be better design to create a record type and construct it with validation. Then any methods that require the function signature can take an Otherwise, we rely on the rest of the code to "just know" whether the input is already validated or not.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is actually using the signature coming from function registry (this is just delegating validation). It needs to be done here since we are converting some aggregate/window function to different ones, and add/remove/change attributes. |
||
| BuiltinFunctionName functionName, RexNode field, List<RexNode> argList) { | ||
| var implementation = getImplementation(functionName); | ||
| validateFunctionArgs(implementation, functionName, field, argList); | ||
| } | ||
|
|
||
| public RelBuilder.AggCall resolveAgg( | ||
| BuiltinFunctionName functionName, | ||
| boolean distinct, | ||
| RexNode field, | ||
| List<RexNode> argList, | ||
| CalcitePlanContext context) { | ||
| var implementation = aggExternalFunctionRegistry.get(functionName); | ||
| if (implementation == null) { | ||
| implementation = aggFunctionRegistry.get(functionName); | ||
| } | ||
| if (implementation == null) { | ||
| throw new IllegalStateException(String.format("Cannot resolve function: %s", functionName)); | ||
| } | ||
| var implementation = getImplementation(functionName); | ||
|
|
||
| // Validation is done based on original argument types to generate error from user perspective. | ||
| validateFunctionArgs(implementation, functionName, field, argList); | ||
|
|
||
| var handler = implementation.getValue(); | ||
| return handler.apply(distinct, field, argList, context); | ||
| } | ||
|
|
||
| static void validateFunctionArgs( | ||
| Pair<CalciteFuncSignature, AggHandler> implementation, | ||
| BuiltinFunctionName functionName, | ||
| RexNode field, | ||
| List<RexNode> argList) { | ||
| CalciteFuncSignature signature = implementation.getKey(); | ||
|
|
||
| List<RelDataType> argTypes = new ArrayList<>(); | ||
| if (field != null) { | ||
| argTypes.add(field.getType()); | ||
| } | ||
| // Currently only PERCENTILE_APPROX and TAKE have additional arguments. | ||
|
|
||
| // Currently only PERCENTILE_APPROX, TAKE, EARLIEST, and LATEST have additional arguments. | ||
| // Their additional arguments will always come as a map of <argName, value> | ||
| List<RelDataType> additionalArgTypes = | ||
| argList.stream().map(PlanUtils::derefMapCall).map(RexNode::getType).toList(); | ||
|
|
@@ -441,10 +454,20 @@ public RelBuilder.AggCall resolveAgg( | |
| errorMessagePattern, | ||
| functionName, | ||
| signature.typeChecker().getAllowedSignatures(), | ||
| getActualSignature(argTypes))); | ||
| PlanUtils.getActualSignature(argTypes))); | ||
| } | ||
| var handler = implementation.getValue(); | ||
| return handler.apply(distinct, field, argList, context); | ||
| } | ||
|
|
||
| private Pair<CalciteFuncSignature, AggHandler> getImplementation( | ||
| BuiltinFunctionName functionName) { | ||
| var implementation = aggExternalFunctionRegistry.get(functionName); | ||
| if (implementation == null) { | ||
| implementation = aggFunctionRegistry.get(functionName); | ||
| } | ||
| if (implementation == null) { | ||
| throw new IllegalStateException(String.format("Cannot resolve function: %s", functionName)); | ||
| } | ||
| return implementation; | ||
| } | ||
|
|
||
| public RexNode resolve(final RexBuilder builder, final String functionName, RexNode... args) { | ||
|
|
@@ -492,7 +515,7 @@ public RexNode resolve( | |
| throw new ExpressionEvaluationException( | ||
| String.format( | ||
| "Cannot resolve function: %s, arguments: %s, caused by: %s", | ||
| functionName, getActualSignature(argTypes), e.getMessage()), | ||
| functionName, PlanUtils.getActualSignature(argTypes), e.getMessage()), | ||
| e); | ||
| } | ||
| StringJoiner allowedSignatures = new StringJoiner(","); | ||
|
|
@@ -505,7 +528,7 @@ functionName, getActualSignature(argTypes), e.getMessage()), | |
| throw new ExpressionEvaluationException( | ||
| String.format( | ||
| "%s function expects {%s}, but got %s", | ||
| functionName, allowedSignatures, getActualSignature(argTypes))); | ||
| functionName, allowedSignatures, PlanUtils.getActualSignature(argTypes))); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1072,21 +1095,6 @@ void registerOperator(BuiltinFunctionName functionName, SqlAggFunction aggFuncti | |
| register(functionName, handler, typeChecker); | ||
| } | ||
|
|
||
| private static RexNode resolveTimeField(List<RexNode> argList, CalcitePlanContext ctx) { | ||
| if (argList.isEmpty()) { | ||
| // Try to find @timestamp field | ||
| var timestampField = | ||
| ctx.relBuilder.peek().getRowType().getField("@timestamp", false, false); | ||
| if (timestampField == null) { | ||
| throw new IllegalArgumentException( | ||
| "Default @timestamp field not found. Please specify a time field explicitly."); | ||
| } | ||
| return ctx.rexBuilder.makeInputRef(timestampField.getType(), timestampField.getIndex()); | ||
| } else { | ||
| return PlanUtils.derefMapCall(argList.get(0)); | ||
| } | ||
| } | ||
|
|
||
| void populate() { | ||
| registerOperator(MAX, SqlStdOperatorTable.MAX); | ||
| registerOperator(MIN, SqlStdOperatorTable.MIN); | ||
|
|
@@ -1116,8 +1124,7 @@ void populate() { | |
| return ctx.relBuilder.count(distinct, null, field); | ||
| } | ||
| }, | ||
| wrapSqlOperandTypeChecker( | ||
| SqlStdOperatorTable.COUNT.getOperandTypeChecker(), COUNT.name(), false)); | ||
| wrapSqlOperandTypeChecker(PPLOperandTypes.OPTIONAL_ANY, COUNT.name(), false)); | ||
|
|
||
| register( | ||
| PERCENTILE_APPROX, | ||
|
|
@@ -1164,20 +1171,22 @@ void populate() { | |
| register( | ||
| EARLIEST, | ||
| (distinct, field, argList, ctx) -> { | ||
| RexNode timeField = resolveTimeField(argList, ctx); | ||
| return ctx.relBuilder.aggregateCall(SqlStdOperatorTable.ARG_MIN, field, timeField); | ||
| List<RexNode> args = resolveTimeField(argList, ctx); | ||
| return UserDefinedFunctionUtils.makeAggregateCall( | ||
| SqlStdOperatorTable.ARG_MIN, List.of(field), args, ctx.relBuilder); | ||
| }, | ||
| wrapSqlOperandTypeChecker( | ||
| SqlStdOperatorTable.ARG_MIN.getOperandTypeChecker(), EARLIEST.name(), false)); | ||
| PPLOperandTypes.ANY_OPTIONAL_TIMESTAMP, EARLIEST.name(), false)); | ||
|
|
||
| register( | ||
| LATEST, | ||
| (distinct, field, argList, ctx) -> { | ||
| RexNode timeField = resolveTimeField(argList, ctx); | ||
| return ctx.relBuilder.aggregateCall(SqlStdOperatorTable.ARG_MAX, field, timeField); | ||
| List<RexNode> args = resolveTimeField(argList, ctx); | ||
| return UserDefinedFunctionUtils.makeAggregateCall( | ||
| SqlStdOperatorTable.ARG_MAX, List.of(field), args, ctx.relBuilder); | ||
| }, | ||
| wrapSqlOperandTypeChecker( | ||
| SqlStdOperatorTable.ARG_MAX.getOperandTypeChecker(), LATEST.name(), false)); | ||
| PPLOperandTypes.ANY_OPTIONAL_TIMESTAMP, EARLIEST.name(), false)); | ||
|
|
||
| // Register FIRST function - uses document order | ||
| register( | ||
|
|
@@ -1201,19 +1210,19 @@ void populate() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Get a string representation of the argument types expressed in ExprType for error messages. | ||
| * | ||
| * @param argTypes the list of argument types as {@link RelDataType} | ||
| * @return a string in the format [type1,type2,...] representing the argument types | ||
| */ | ||
| private static String getActualSignature(List<RelDataType> argTypes) { | ||
| return "[" | ||
| + argTypes.stream() | ||
| .map(OpenSearchTypeFactory::convertRelDataTypeToExprType) | ||
| .map(Objects::toString) | ||
| .collect(Collectors.joining(",")) | ||
| + "]"; | ||
| static List<RexNode> resolveTimeField(List<RexNode> argList, CalcitePlanContext ctx) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may reuse this in any function with implicit @timestamp field? #4138
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method relies on the position of time field param, and cannot be directly reused, but we should unify the logic to refer implicit(default) timestamp field. Added tracking issue: #4275 |
||
| if (argList.isEmpty()) { | ||
| // Try to find @timestamp field | ||
| var timestampField = ctx.relBuilder.peek().getRowType().getField("@timestamp", false, false); | ||
| if (timestampField == null) { | ||
| throw new IllegalArgumentException( | ||
| "Default @timestamp field not found. Please specify a time field explicitly."); | ||
| } | ||
| return List.of( | ||
| ctx.rexBuilder.makeInputRef(timestampField.getType(), timestampField.getIndex())); | ||
| } else { | ||
| return argList.stream().map(PlanUtils::derefMapCall).collect(Collectors.toList()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1257,6 +1266,8 @@ private static PPLTypeChecker wrapSqlOperandTypeChecker( | |
| pplTypeChecker = PPLTypeChecker.wrapComparable(comparableTypeChecker); | ||
| } else if (typeChecker instanceof UDFOperandMetadata.UDTOperandMetadata udtOperandMetadata) { | ||
| pplTypeChecker = PPLTypeChecker.wrapUDT(udtOperandMetadata.allowedParamTypes()); | ||
| } else if (typeChecker != null) { | ||
| pplTypeChecker = PPLTypeChecker.wrapDefault(typeChecker); | ||
| } else { | ||
| logger.info( | ||
| "Cannot create type checker for function: {}. Will skip its type checking", functionName); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.