-
Notifications
You must be signed in to change notification settings - Fork 180
Add per_second function support for timechart command
#4464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 26 commits
29d4bf8
9aff342
86920fa
1276017
0ae12ff
3caab3c
875d883
346cc5e
ac88a6f
69d07a8
ab05e3d
cf32829
4e09326
768ba11
5081528
e1cb01c
5a4400c
47cfd71
79efdb3
38a1039
832528f
f2fedd3
5ecf154
aef965a
83ee450
519ad59
995171e
c2d96c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,14 +5,43 @@ | |
|
|
||
| package org.opensearch.sql.ast.tree; | ||
|
|
||
| import static org.opensearch.sql.ast.dsl.AstDSL.aggregate; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.doubleLiteral; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.eval; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.function; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.stringLiteral; | ||
| import static org.opensearch.sql.ast.expression.IntervalUnit.SECOND; | ||
| import static org.opensearch.sql.ast.tree.Timechart.PerFunctionRateExprBuilder.sum; | ||
| import static org.opensearch.sql.ast.tree.Timechart.PerFunctionRateExprBuilder.timestampadd; | ||
| import static org.opensearch.sql.ast.tree.Timechart.PerFunctionRateExprBuilder.timestampdiff; | ||
| import static org.opensearch.sql.calcite.plan.OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP; | ||
| import static org.opensearch.sql.expression.function.BuiltinFunctionName.DIVIDE; | ||
| import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY; | ||
| import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUM; | ||
| import static org.opensearch.sql.expression.function.BuiltinFunctionName.TIMESTAMPADD; | ||
| import static org.opensearch.sql.expression.function.BuiltinFunctionName.TIMESTAMPDIFF; | ||
|
|
||
| import com.google.common.collect.ImmutableList; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import lombok.AllArgsConstructor; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.ToString; | ||
| import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
| import org.opensearch.sql.ast.dsl.AstDSL; | ||
| import org.opensearch.sql.ast.expression.AggregateFunction; | ||
| import org.opensearch.sql.ast.expression.Field; | ||
| import org.opensearch.sql.ast.expression.Function; | ||
| import org.opensearch.sql.ast.expression.IntervalUnit; | ||
| import org.opensearch.sql.ast.expression.Let; | ||
| import org.opensearch.sql.ast.expression.Span; | ||
| import org.opensearch.sql.ast.expression.SpanUnit; | ||
| import org.opensearch.sql.ast.expression.UnresolvedExpression; | ||
| import org.opensearch.sql.calcite.utils.PlanUtils; | ||
|
|
||
| /** AST node represent Timechart operation. */ | ||
| @Getter | ||
|
|
@@ -49,8 +78,9 @@ public Timechart useOther(Boolean useOther) { | |
| } | ||
|
|
||
| @Override | ||
| public Timechart attach(UnresolvedPlan child) { | ||
| return toBuilder().child(child).build(); | ||
| public UnresolvedPlan attach(UnresolvedPlan child) { | ||
| // Transform after child attached to avoid unintentionally overriding it | ||
| return toBuilder().child(child).build().transformPerFunction(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -62,4 +92,112 @@ public List<UnresolvedPlan> getChild() { | |
| public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) { | ||
| return nodeVisitor.visitTimechart(this, context); | ||
| } | ||
|
|
||
| /** | ||
| * Transform per function to eval-based post-processing on sum result by timechart. Specifically, | ||
| * calculate how many seconds are in the time bucket based on the span option dynamically, then | ||
| * divide the aggregated sum value by the number of seconds to get the per-second rate. | ||
| * | ||
| * <p>For example, with span=5m per_second(field): per second rate = sum(field) / 300 seconds | ||
| * | ||
| * @return eval+timechart if per function present, or the original timechart otherwise. | ||
| */ | ||
| private UnresolvedPlan transformPerFunction() { | ||
| Optional<PerFunction> perFuncOpt = PerFunction.from(aggregateFunction); | ||
| if (perFuncOpt.isEmpty()) { | ||
| return this; | ||
| } | ||
|
|
||
| PerFunction perFunc = perFuncOpt.get(); | ||
| Span span = (Span) this.binExpression; | ||
| Field spanStartTime = AstDSL.field(IMPLICIT_FIELD_TIMESTAMP); | ||
| Function spanEndTime = timestampadd(span.getUnit(), span.getValue(), spanStartTime); | ||
| Function spanSeconds = timestampdiff(SECOND, spanStartTime, spanEndTime); | ||
|
|
||
| return eval( | ||
| timechart(AstDSL.alias(perFunc.aggName, sum(perFunc.aggArg))), | ||
| let(perFunc.aggName).multiply(perFunc.seconds).dividedBy(spanSeconds)); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if the span evaluates to 0 seconds (e.g., with millisecond spans or edge cases)? Should there be validation or error handling for division by zero?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Let me check if
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I quick tested opensearchsql> source=test_data_2023 | timechart span=0m per_second(packets);
TransportError(500, 'SearchPhaseExecutionException', {'error':
{'reason':'Error occurred in OpenSearch engine: all shards failed', 'details': 'Shard[0]:
java.lang.IllegalArgumentException: Zero or negative time interval not supported\n\n
For more details, please send request for Json format to see the raw response from OpenSearch engine.',
'type': 'SearchPhaseExecutionException'}, 'status': 400})
opensearchsql> source=test_data_2023 | timechart span=-1m per_second(packets);
{'reason': 'Invalid Query', 'details': "[-] is not a valid term at this part of the query: '...23
| timechart span=-' <-- HERE. extraneous input '-' expecting {SPANLENGTH, INTEGER_LITERAL}",
'type': 'SyntaxCheckException'}
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created issue: #4527. Thanks! |
||
| } | ||
|
|
||
| private Timechart timechart(UnresolvedExpression newAggregateFunction) { | ||
| return this.toBuilder().aggregateFunction(newAggregateFunction).build(); | ||
| } | ||
|
|
||
| /** TODO: extend to support additional per_* functions */ | ||
| @RequiredArgsConstructor | ||
| static class PerFunction { | ||
| private static final Map<String, Integer> UNIT_SECONDS = Map.of("per_second", 1); | ||
| private final String aggName; | ||
| private final UnresolvedExpression aggArg; | ||
| private final int seconds; | ||
|
|
||
| static Optional<PerFunction> from(UnresolvedExpression aggExpr) { | ||
| if (!(aggExpr instanceof AggregateFunction)) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| AggregateFunction aggFunc = (AggregateFunction) aggExpr; | ||
| String aggFuncName = aggFunc.getFuncName().toLowerCase(Locale.ROOT); | ||
| if (!UNIT_SECONDS.containsKey(aggFuncName)) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| String aggName = toAggName(aggFunc); | ||
| return Optional.of( | ||
| new PerFunction(aggName, aggFunc.getField(), UNIT_SECONDS.get(aggFuncName))); | ||
| } | ||
|
|
||
| private static String toAggName(AggregateFunction aggFunc) { | ||
| String fieldName = | ||
| (aggFunc.getField() instanceof Field) | ||
| ? ((Field) aggFunc.getField()).getField().toString() | ||
| : aggFunc.getField().toString(); | ||
| return String.format(Locale.ROOT, "%s(%s)", aggFunc.getFuncName(), fieldName); | ||
| } | ||
| } | ||
|
|
||
| private PerFunctionRateExprBuilder let(String fieldName) { | ||
| return new PerFunctionRateExprBuilder(AstDSL.field(fieldName)); | ||
| } | ||
|
|
||
| /** Fluent builder for creating Let expressions with mathematical operations. */ | ||
| static class PerFunctionRateExprBuilder { | ||
| private final Field field; | ||
| private UnresolvedExpression expr; | ||
|
|
||
| PerFunctionRateExprBuilder(Field field) { | ||
| this.field = field; | ||
| this.expr = field; | ||
| } | ||
|
|
||
| PerFunctionRateExprBuilder multiply(Integer multiplier) { | ||
| // Promote to double literal to avoid integer division in downstream | ||
| this.expr = | ||
| function( | ||
| MULTIPLY.getName().getFunctionName(), expr, doubleLiteral(multiplier.doubleValue())); | ||
| return this; | ||
| } | ||
|
|
||
| Let dividedBy(UnresolvedExpression divisor) { | ||
| return AstDSL.let(field, function(DIVIDE.getName().getFunctionName(), expr, divisor)); | ||
| } | ||
|
|
||
| static UnresolvedExpression sum(UnresolvedExpression field) { | ||
| return aggregate(SUM.getName().getFunctionName(), field); | ||
| } | ||
|
|
||
| static Function timestampadd( | ||
| SpanUnit unit, UnresolvedExpression value, UnresolvedExpression timestampField) { | ||
| UnresolvedExpression intervalUnit = | ||
| stringLiteral(PlanUtils.spanUnitToIntervalUnit(unit).toString()); | ||
| return function( | ||
| TIMESTAMPADD.getName().getFunctionName(), intervalUnit, value, timestampField); | ||
| } | ||
|
|
||
| static Function timestampdiff( | ||
| IntervalUnit unit, UnresolvedExpression start, UnresolvedExpression end) { | ||
| return function( | ||
| TIMESTAMPDIFF.getName().getFunctionName(), stringLiteral(unit.toString()), start, end); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.ast.tree; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.aggregate; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.alias; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.doubleLiteral; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.field; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.function; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; | ||
| import static org.opensearch.sql.ast.dsl.AstDSL.relation; | ||
|
|
||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.params.ParameterizedTest; | ||
| import org.junit.jupiter.params.provider.CsvSource; | ||
| import org.opensearch.sql.ast.dsl.AstDSL; | ||
| import org.opensearch.sql.ast.expression.AggregateFunction; | ||
| import org.opensearch.sql.ast.expression.Let; | ||
| import org.opensearch.sql.ast.expression.Span; | ||
| import org.opensearch.sql.ast.expression.SpanUnit; | ||
| import org.opensearch.sql.ast.expression.UnresolvedExpression; | ||
|
|
||
| class TimechartTest { | ||
|
|
||
| @ParameterizedTest | ||
| @CsvSource({"1, m, MINUTE", "30, s, SECOND", "5, m, MINUTE", "2, h, HOUR", "1, d, DAY"}) | ||
| void should_transform_per_second_for_different_spans( | ||
| int spanValue, String spanUnit, String expectedIntervalUnit) { | ||
| withTimechart(span(spanValue, spanUnit), perSecond("bytes")) | ||
| .whenTransformingPerFunction() | ||
| .thenExpect( | ||
| eval( | ||
| let( | ||
| "per_second(bytes)", | ||
| divide( | ||
| multiply("per_second(bytes)", 1.0), | ||
| timestampdiff( | ||
| "SECOND", | ||
| "@timestamp", | ||
| timestampadd(expectedIntervalUnit, spanValue, "@timestamp")))), | ||
| timechart(span(spanValue, spanUnit), alias("per_second(bytes)", sum("bytes"))))); | ||
| } | ||
|
|
||
| @Test | ||
| void should_not_transform_non_per_functions() { | ||
| withTimechart(span(1, "m"), sum("bytes")) | ||
| .whenTransformingPerFunction() | ||
| .thenExpect(timechart(span(1, "m"), sum("bytes"))); | ||
| } | ||
|
|
||
| @Test | ||
| void should_preserve_all_fields_during_per_function_transformation() { | ||
| Timechart original = | ||
| new Timechart(relation("logs"), perSecond("bytes")) | ||
| .span(span(5, "m")) | ||
| .by(field("status")) | ||
| .limit(20) | ||
| .useOther(false); | ||
|
|
||
| Timechart expected = | ||
| new Timechart(relation("logs"), alias("per_second(bytes)", sum("bytes"))) | ||
| .span(span(5, "m")) | ||
| .by(field("status")) | ||
| .limit(20) | ||
| .useOther(false); | ||
|
|
||
| withTimechart(original) | ||
| .whenTransformingPerFunction() | ||
| .thenExpect( | ||
| eval( | ||
| let( | ||
| "per_second(bytes)", | ||
| divide( | ||
| multiply("per_second(bytes)", 1.0), | ||
| timestampdiff( | ||
| "SECOND", "@timestamp", timestampadd("MINUTE", 5, "@timestamp")))), | ||
| expected)); | ||
| } | ||
|
|
||
| // Fluent API for readable test assertions | ||
|
|
||
| private static TransformationAssertion withTimechart(Span spanExpr, AggregateFunction aggFunc) { | ||
| return new TransformationAssertion(timechart(spanExpr, aggFunc)); | ||
| } | ||
|
|
||
| private static TransformationAssertion withTimechart(Timechart timechart) { | ||
| return new TransformationAssertion(timechart); | ||
| } | ||
|
|
||
| private static Timechart timechart(Span spanExpr, UnresolvedExpression aggExpr) { | ||
| // Set child here because expected object won't call attach below | ||
| return new Timechart(relation("t"), aggExpr).span(spanExpr).limit(10).useOther(true); | ||
| } | ||
|
|
||
| private static Span span(int value, String unit) { | ||
| return AstDSL.span(field("@timestamp"), intLiteral(value), SpanUnit.of(unit)); | ||
| } | ||
|
|
||
| private static AggregateFunction perSecond(String fieldName) { | ||
| return (AggregateFunction) aggregate("per_second", field(fieldName)); | ||
| } | ||
|
|
||
| private static AggregateFunction sum(String fieldName) { | ||
| return (AggregateFunction) aggregate("sum", field(fieldName)); | ||
| } | ||
|
|
||
| private static Let let(String fieldName, UnresolvedExpression expression) { | ||
| return AstDSL.let(field(fieldName), expression); | ||
| } | ||
|
|
||
| private static UnresolvedExpression multiply(String fieldName, double right) { | ||
| return function("*", field(fieldName), doubleLiteral(right)); | ||
| } | ||
|
|
||
| private static UnresolvedExpression divide( | ||
| UnresolvedExpression left, UnresolvedExpression right) { | ||
| return function("/", left, right); | ||
| } | ||
|
|
||
| private static UnresolvedExpression timestampadd(String unit, int value, String timestampField) { | ||
| return function( | ||
| "timestampadd", AstDSL.stringLiteral(unit), intLiteral(value), field(timestampField)); | ||
| } | ||
|
|
||
| private static UnresolvedExpression timestampdiff( | ||
| String unit, String startField, UnresolvedExpression end) { | ||
| return function("timestampdiff", AstDSL.stringLiteral(unit), field(startField), end); | ||
| } | ||
|
|
||
| private static UnresolvedPlan eval(Let letExpr, Timechart timechartExpr) { | ||
| return AstDSL.eval(timechartExpr, letExpr); | ||
| } | ||
|
|
||
| private static class TransformationAssertion { | ||
| private final Timechart timechart; | ||
| private UnresolvedPlan result; | ||
|
|
||
| TransformationAssertion(Timechart timechart) { | ||
| this.timechart = timechart; | ||
| } | ||
|
|
||
| public TransformationAssertion whenTransformingPerFunction() { | ||
| this.result = timechart.attach(timechart.getChild().get(0)); | ||
| return this; | ||
| } | ||
|
|
||
| public void thenExpect(UnresolvedPlan expected) { | ||
| assertEquals(expected, result); | ||
| } | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.