diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Timechart.java b/core/src/main/java/org/opensearch/sql/ast/tree/Timechart.java index 1a4c154209c..05646a363f6 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/Timechart.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Timechart.java @@ -5,14 +5,43 @@ package org.opensearch.sql.ast.tree; +import static org.opensearch.sql.ast.dsl.AstDSL.aggregate; +import static org.opensearch.sql.ast.dsl.AstDSL.doubleLiteral; +import static org.opensearch.sql.ast.dsl.AstDSL.eval; +import static org.opensearch.sql.ast.dsl.AstDSL.function; +import static org.opensearch.sql.ast.dsl.AstDSL.stringLiteral; +import static org.opensearch.sql.ast.expression.IntervalUnit.SECOND; +import static org.opensearch.sql.ast.tree.Timechart.PerFunctionRateExprBuilder.sum; +import static org.opensearch.sql.ast.tree.Timechart.PerFunctionRateExprBuilder.timestampadd; +import static org.opensearch.sql.ast.tree.Timechart.PerFunctionRateExprBuilder.timestampdiff; +import static org.opensearch.sql.calcite.plan.OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.DIVIDE; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.MULTIPLY; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.SUM; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.TIMESTAMPADD; +import static org.opensearch.sql.expression.function.BuiltinFunctionName.TIMESTAMPDIFF; + import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.ToString; import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.dsl.AstDSL; +import org.opensearch.sql.ast.expression.AggregateFunction; +import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Function; +import org.opensearch.sql.ast.expression.IntervalUnit; +import org.opensearch.sql.ast.expression.Let; +import org.opensearch.sql.ast.expression.Span; +import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.expression.UnresolvedExpression; +import org.opensearch.sql.calcite.utils.PlanUtils; /** AST node represent Timechart operation. */ @Getter @@ -49,8 +78,9 @@ public Timechart useOther(Boolean useOther) { } @Override - public Timechart attach(UnresolvedPlan child) { - return toBuilder().child(child).build(); + public UnresolvedPlan attach(UnresolvedPlan child) { + // Transform after child attached to avoid unintentionally overriding it + return toBuilder().child(child).build().transformPerFunction(); } @Override @@ -62,4 +92,112 @@ public List getChild() { public T accept(AbstractNodeVisitor nodeVisitor, C context) { return nodeVisitor.visitTimechart(this, context); } + + /** + * Transform per function to eval-based post-processing on sum result by timechart. Specifically, + * calculate how many seconds are in the time bucket based on the span option dynamically, then + * divide the aggregated sum value by the number of seconds to get the per-second rate. + * + *

For example, with span=5m per_second(field): per second rate = sum(field) / 300 seconds + * + * @return eval+timechart if per function present, or the original timechart otherwise. + */ + private UnresolvedPlan transformPerFunction() { + Optional perFuncOpt = PerFunction.from(aggregateFunction); + if (perFuncOpt.isEmpty()) { + return this; + } + + PerFunction perFunc = perFuncOpt.get(); + Span span = (Span) this.binExpression; + Field spanStartTime = AstDSL.field(IMPLICIT_FIELD_TIMESTAMP); + Function spanEndTime = timestampadd(span.getUnit(), span.getValue(), spanStartTime); + Function spanSeconds = timestampdiff(SECOND, spanStartTime, spanEndTime); + + return eval( + timechart(AstDSL.alias(perFunc.aggName, sum(perFunc.aggArg))), + let(perFunc.aggName).multiply(perFunc.seconds).dividedBy(spanSeconds)); + } + + private Timechart timechart(UnresolvedExpression newAggregateFunction) { + return this.toBuilder().aggregateFunction(newAggregateFunction).build(); + } + + /** TODO: extend to support additional per_* functions */ + @RequiredArgsConstructor + static class PerFunction { + private static final Map UNIT_SECONDS = Map.of("per_second", 1); + private final String aggName; + private final UnresolvedExpression aggArg; + private final int seconds; + + static Optional from(UnresolvedExpression aggExpr) { + if (!(aggExpr instanceof AggregateFunction)) { + return Optional.empty(); + } + + AggregateFunction aggFunc = (AggregateFunction) aggExpr; + String aggFuncName = aggFunc.getFuncName().toLowerCase(Locale.ROOT); + if (!UNIT_SECONDS.containsKey(aggFuncName)) { + return Optional.empty(); + } + + String aggName = toAggName(aggFunc); + return Optional.of( + new PerFunction(aggName, aggFunc.getField(), UNIT_SECONDS.get(aggFuncName))); + } + + private static String toAggName(AggregateFunction aggFunc) { + String fieldName = + (aggFunc.getField() instanceof Field) + ? ((Field) aggFunc.getField()).getField().toString() + : aggFunc.getField().toString(); + return String.format(Locale.ROOT, "%s(%s)", aggFunc.getFuncName(), fieldName); + } + } + + private PerFunctionRateExprBuilder let(String fieldName) { + return new PerFunctionRateExprBuilder(AstDSL.field(fieldName)); + } + + /** Fluent builder for creating Let expressions with mathematical operations. */ + static class PerFunctionRateExprBuilder { + private final Field field; + private UnresolvedExpression expr; + + PerFunctionRateExprBuilder(Field field) { + this.field = field; + this.expr = field; + } + + PerFunctionRateExprBuilder multiply(Integer multiplier) { + // Promote to double literal to avoid integer division in downstream + this.expr = + function( + MULTIPLY.getName().getFunctionName(), expr, doubleLiteral(multiplier.doubleValue())); + return this; + } + + Let dividedBy(UnresolvedExpression divisor) { + return AstDSL.let(field, function(DIVIDE.getName().getFunctionName(), expr, divisor)); + } + + static UnresolvedExpression sum(UnresolvedExpression field) { + return aggregate(SUM.getName().getFunctionName(), field); + } + + static Function timestampadd( + SpanUnit unit, UnresolvedExpression value, UnresolvedExpression timestampField) { + UnresolvedExpression intervalUnit = + stringLiteral(PlanUtils.spanUnitToIntervalUnit(unit).toString()); + return function( + TIMESTAMPADD.getName().getFunctionName(), intervalUnit, value, timestampField); + } + + static Function timestampdiff( + IntervalUnit unit, UnresolvedExpression start, UnresolvedExpression end) { + return function( + TIMESTAMPDIFF.getName().getFunctionName(), stringLiteral(unit.toString()), start, end); + } + } } diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index f4a37acd280..bcaf30cb0c2 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1914,6 +1914,9 @@ public RelNode visitFlatten(Flatten node, CalcitePlanContext context) { /** Helper method to get the function name for proper column naming */ private String getValueFunctionName(UnresolvedExpression aggregateFunction) { + if (aggregateFunction instanceof Alias) { + return ((Alias) aggregateFunction).getName(); + } if (!(aggregateFunction instanceof AggregateFunction)) { return "value"; } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java index 4d3bef062fa..0998120e489 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java @@ -74,6 +74,59 @@ static SpanUnit intervalUnitToSpanUnit(IntervalUnit unit) { }; } + static IntervalUnit spanUnitToIntervalUnit(SpanUnit unit) { + switch (unit) { + case MILLISECOND: + case MS: + return IntervalUnit.MICROSECOND; + case SECOND: + case SECONDS: + case SEC: + case SECS: + case S: + return IntervalUnit.SECOND; + case MINUTE: + case MINUTES: + case MIN: + case MINS: + case m: + return IntervalUnit.MINUTE; + case HOUR: + case HOURS: + case HR: + case HRS: + case H: + return IntervalUnit.HOUR; + case DAY: + case DAYS: + case D: + return IntervalUnit.DAY; + case WEEK: + case WEEKS: + case W: + return IntervalUnit.WEEK; + case MONTH: + case MONTHS: + case MON: + case M: + return IntervalUnit.MONTH; + case QUARTER: + case QUARTERS: + case QTR: + case QTRS: + case Q: + return IntervalUnit.QUARTER; + case YEAR: + case YEARS: + case Y: + return IntervalUnit.YEAR; + case UNKNOWN: + return IntervalUnit.UNKNOWN; + default: + throw new UnsupportedOperationException("Unsupported span unit: " + unit); + } + } + static RexNode makeOver( CalcitePlanContext context, BuiltinFunctionName functionName, diff --git a/core/src/test/java/org/opensearch/sql/ast/tree/TimechartTest.java b/core/src/test/java/org/opensearch/sql/ast/tree/TimechartTest.java new file mode 100644 index 00000000000..c23964d75a7 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/ast/tree/TimechartTest.java @@ -0,0 +1,155 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.opensearch.sql.ast.dsl.AstDSL.aggregate; +import static org.opensearch.sql.ast.dsl.AstDSL.alias; +import static org.opensearch.sql.ast.dsl.AstDSL.doubleLiteral; +import static org.opensearch.sql.ast.dsl.AstDSL.field; +import static org.opensearch.sql.ast.dsl.AstDSL.function; +import static org.opensearch.sql.ast.dsl.AstDSL.intLiteral; +import static org.opensearch.sql.ast.dsl.AstDSL.relation; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.opensearch.sql.ast.dsl.AstDSL; +import org.opensearch.sql.ast.expression.AggregateFunction; +import org.opensearch.sql.ast.expression.Let; +import org.opensearch.sql.ast.expression.Span; +import org.opensearch.sql.ast.expression.SpanUnit; +import org.opensearch.sql.ast.expression.UnresolvedExpression; + +class TimechartTest { + + @ParameterizedTest + @CsvSource({"1, m, MINUTE", "30, s, SECOND", "5, m, MINUTE", "2, h, HOUR", "1, d, DAY"}) + void should_transform_per_second_for_different_spans( + int spanValue, String spanUnit, String expectedIntervalUnit) { + withTimechart(span(spanValue, spanUnit), perSecond("bytes")) + .whenTransformingPerFunction() + .thenExpect( + eval( + let( + "per_second(bytes)", + divide( + multiply("per_second(bytes)", 1.0), + timestampdiff( + "SECOND", + "@timestamp", + timestampadd(expectedIntervalUnit, spanValue, "@timestamp")))), + timechart(span(spanValue, spanUnit), alias("per_second(bytes)", sum("bytes"))))); + } + + @Test + void should_not_transform_non_per_functions() { + withTimechart(span(1, "m"), sum("bytes")) + .whenTransformingPerFunction() + .thenExpect(timechart(span(1, "m"), sum("bytes"))); + } + + @Test + void should_preserve_all_fields_during_per_function_transformation() { + Timechart original = + new Timechart(relation("logs"), perSecond("bytes")) + .span(span(5, "m")) + .by(field("status")) + .limit(20) + .useOther(false); + + Timechart expected = + new Timechart(relation("logs"), alias("per_second(bytes)", sum("bytes"))) + .span(span(5, "m")) + .by(field("status")) + .limit(20) + .useOther(false); + + withTimechart(original) + .whenTransformingPerFunction() + .thenExpect( + eval( + let( + "per_second(bytes)", + divide( + multiply("per_second(bytes)", 1.0), + timestampdiff( + "SECOND", "@timestamp", timestampadd("MINUTE", 5, "@timestamp")))), + expected)); + } + + // Fluent API for readable test assertions + + private static TransformationAssertion withTimechart(Span spanExpr, AggregateFunction aggFunc) { + return new TransformationAssertion(timechart(spanExpr, aggFunc)); + } + + private static TransformationAssertion withTimechart(Timechart timechart) { + return new TransformationAssertion(timechart); + } + + private static Timechart timechart(Span spanExpr, UnresolvedExpression aggExpr) { + // Set child here because expected object won't call attach below + return new Timechart(relation("t"), aggExpr).span(spanExpr).limit(10).useOther(true); + } + + private static Span span(int value, String unit) { + return AstDSL.span(field("@timestamp"), intLiteral(value), SpanUnit.of(unit)); + } + + private static AggregateFunction perSecond(String fieldName) { + return (AggregateFunction) aggregate("per_second", field(fieldName)); + } + + private static AggregateFunction sum(String fieldName) { + return (AggregateFunction) aggregate("sum", field(fieldName)); + } + + private static Let let(String fieldName, UnresolvedExpression expression) { + return AstDSL.let(field(fieldName), expression); + } + + private static UnresolvedExpression multiply(String fieldName, double right) { + return function("*", field(fieldName), doubleLiteral(right)); + } + + private static UnresolvedExpression divide( + UnresolvedExpression left, UnresolvedExpression right) { + return function("/", left, right); + } + + private static UnresolvedExpression timestampadd(String unit, int value, String timestampField) { + return function( + "timestampadd", AstDSL.stringLiteral(unit), intLiteral(value), field(timestampField)); + } + + private static UnresolvedExpression timestampdiff( + String unit, String startField, UnresolvedExpression end) { + return function("timestampdiff", AstDSL.stringLiteral(unit), field(startField), end); + } + + private static UnresolvedPlan eval(Let letExpr, Timechart timechartExpr) { + return AstDSL.eval(timechartExpr, letExpr); + } + + private static class TransformationAssertion { + private final Timechart timechart; + private UnresolvedPlan result; + + TransformationAssertion(Timechart timechart) { + this.timechart = timechart; + } + + public TransformationAssertion whenTransformingPerFunction() { + this.result = timechart.attach(timechart.getChild().get(0)); + return this; + } + + public void thenExpect(UnresolvedPlan expected) { + assertEquals(expected, result); + } + } +} diff --git a/docs/user/ppl/cmd/timechart.rst b/docs/user/ppl/cmd/timechart.rst index a5708769035..0e1c2cf5360 100644 --- a/docs/user/ppl/cmd/timechart.rst +++ b/docs/user/ppl/cmd/timechart.rst @@ -57,14 +57,28 @@ Syntax * When set to true, values beyond the limit are grouped into an "OTHER" category. * Only applies when using the "by" clause and when there are more distinct values than the limit. +* **by**: optional. Groups the results by the specified field in addition to time intervals. + + * If not specified, the aggregation is performed across all documents in each time interval. + * **aggregation_function**: mandatory. The aggregation function to apply to each time bucket. * Currently, only a single aggregation function is supported. - * Available functions: All aggregation functions supported by the :doc:`stats ` command are supported. + * Available functions: All aggregation functions supported by the :doc:`stats ` command, as well as the timechart-specific aggregations listed below. -* **by**: optional. Groups the results by the specified field in addition to time intervals. +PER_SECOND +---------- - * If not specified, the aggregation is performed across all documents in each time interval. +Description +>>>>>>>>>>> + +Usage: per_second(field) calculates the per-second rate for a numeric field within each time bucket. + +The calculation formula is: `per_second(field) = sum(field) / span_in_seconds`, where `span_in_seconds` is the span interval in seconds. + +Note: This function is available since 3.4.0. + +Return type: DOUBLE Notes ===== @@ -332,3 +346,20 @@ PPL query:: | 2024-07-01 00:00:00 | null | 1 | +---------------------+--------+-------+ +Example 11: Calculate packets per second rate +============================================= + +This example calculates the per-second packet rate for network traffic data using the per_second() function. + +PPL query:: + + os> source=events | timechart span=30m per_second(packets) by host + fetched rows / total rows = 4/4 + +---------------------+---------+---------------------+ + | @timestamp | host | per_second(packets) | + |---------------------+---------+---------------------| + | 2023-01-01 10:00:00 | server1 | 0.1 | + | 2023-01-01 10:00:00 | server2 | 0.05 | + | 2023-01-01 10:30:00 | server1 | 0.1 | + | 2023-01-01 10:30:00 | server2 | 0.05 | + +---------------------+---------+---------------------+ diff --git a/doctest/test_data/events.json b/doctest/test_data/events.json index e873691fb90..ea63088151a 100644 --- a/doctest/test_data/events.json +++ b/doctest/test_data/events.json @@ -1,8 +1,8 @@ -{"@timestamp":"2023-01-01T10:00:00Z","event_time":"2023-01-01T09:55:00Z","host":"server1","message":"Starting up","level":"INFO","category":"orders","status":"pending"} -{"@timestamp":"2023-01-01T10:05:00Z","event_time":"2023-01-01T10:00:00Z","host":"server2","message":"Initializing","level":"INFO","category":"users","status":"active"} -{"@timestamp":"2023-01-01T10:10:00Z","event_time":"2023-01-01T10:05:00Z","host":"server1","message":"Ready to serve","level":"INFO","category":"orders","status":"processing"} -{"@timestamp":"2023-01-01T10:15:00Z","event_time":"2023-01-01T10:10:00Z","host":"server2","message":"Ready","level":"INFO","category":"users","status":"inactive"} -{"@timestamp":"2023-01-01T10:20:00Z","event_time":"2023-01-01T10:15:00Z","host":"server1","message":"Processing requests","level":"INFO","category":"orders","status":"completed"} -{"@timestamp":"2023-01-01T10:25:00Z","event_time":"2023-01-01T10:20:00Z","host":"server2","message":"Handling connections","level":"INFO","category":"users","status":"pending"} -{"@timestamp":"2023-01-01T10:30:00Z","event_time":"2023-01-01T10:25:00Z","host":"server1","message":"Shutting down","level":"WARN","category":"orders","status":"cancelled"} -{"@timestamp":"2023-01-01T10:35:00Z","event_time":"2023-01-01T10:30:00Z","host":"server2","message":"Maintenance mode","level":"WARN","category":"users","status":"inactive"} +{"@timestamp":"2023-01-01T10:00:00Z","event_time":"2023-01-01T09:55:00Z","host":"server1","message":"Starting up","level":"INFO","category":"orders","status":"pending","packets":60} +{"@timestamp":"2023-01-01T10:05:00Z","event_time":"2023-01-01T10:00:00Z","host":"server2","message":"Initializing","level":"INFO","category":"users","status":"active","packets":30} +{"@timestamp":"2023-01-01T10:10:00Z","event_time":"2023-01-01T10:05:00Z","host":"server1","message":"Ready to serve","level":"INFO","category":"orders","status":"processing","packets":60} +{"@timestamp":"2023-01-01T10:15:00Z","event_time":"2023-01-01T10:10:00Z","host":"server2","message":"Ready","level":"INFO","category":"users","status":"inactive","packets":30} +{"@timestamp":"2023-01-01T10:20:00Z","event_time":"2023-01-01T10:15:00Z","host":"server1","message":"Processing requests","level":"INFO","category":"orders","status":"completed","packets":60} +{"@timestamp":"2023-01-01T10:25:00Z","event_time":"2023-01-01T10:20:00Z","host":"server2","message":"Handling connections","level":"INFO","category":"users","status":"pending","packets":30} +{"@timestamp":"2023-01-01T10:30:00Z","event_time":"2023-01-01T10:25:00Z","host":"server1","message":"Shutting down","level":"WARN","category":"orders","status":"cancelled","packets":180} +{"@timestamp":"2023-01-01T10:35:00Z","event_time":"2023-01-01T10:30:00Z","host":"server2","message":"Maintenance mode","level":"WARN","category":"users","status":"inactive","packets":90} diff --git a/doctest/test_mapping/events.json b/doctest/test_mapping/events.json index 664f042324b..2c405a23fbb 100644 --- a/doctest/test_mapping/events.json +++ b/doctest/test_mapping/events.json @@ -23,6 +23,9 @@ }, "status": { "type": "keyword" + }, + "packets": { + "type": "integer" } } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index d755c7acc8f..28fbfc8630b 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -310,6 +310,16 @@ public void testExplainWithTimechartCount() throws IOException { assertYamlEqualsJsonIgnoreId(expected, result); } + @Test + public void testExplainTimechartPerSecond() throws IOException { + var result = explainQueryToString("source=events | timechart span=2m per_second(cpu_usage)"); + assertTrue( + result.contains( + "per_second(cpu_usage)=[DIVIDE(*($1, 1.0E0), " + + "TIMESTAMPDIFF('SECOND':VARCHAR, $0, TIMESTAMPADD('MINUTE':VARCHAR, 2, $0)))]")); + assertTrue(result.contains("per_second(cpu_usage)=[SUM($0)]")); + } + @Test public void noPushDownForAggOnWindow() throws IOException { enabledOnlyWhenPushdownIsEnabled(); diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimechartPerFunctionIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimechartPerFunctionIT.java new file mode 100644 index 00000000000..9965d459a22 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimechartPerFunctionIT.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalciteTimechartPerFunctionIT extends PPLIntegTestCase { + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + disallowCalciteFallback(); + + loadIndex(Index.EVENTS_TRAFFIC); + } + + @Test + public void testTimechartPerSecondWithDefaultSpan() throws IOException { + JSONObject result = + executeQuery( + "source=events_traffic | where month(@timestamp) = 9 | timechart per_second(packets)"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("per_second(packets)", "double")); + verifyDataRows( + result, + rows("2025-09-08 10:00:00", 1.0), // 60 / 1m + rows("2025-09-08 10:01:00", 2.0), // 120 / 1m + rows("2025-09-08 10:02:00", 4.0)); // (60+180) / 1m + } + + @Test + public void testTimechartPerSecondWithSpecifiedSpan() throws IOException { + JSONObject result = + executeQuery( + "source=events_traffic | where month(@timestamp) = 9 | timechart span=2m" + + " per_second(packets)"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("per_second(packets)", "double")); + verifyDataRows( + result, + rows("2025-09-08 10:00:00", 1.5), // (60+120) / 2m + rows("2025-09-08 10:02:00", 2.0)); // (60+180) / 2m + } + + @Test + public void testTimechartPerSecondWithByClause() throws IOException { + JSONObject result = + executeQuery( + "source=events_traffic | where month(@timestamp) = 9 | timechart span=2m" + + " per_second(packets) by host"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("host", "string"), + schema("per_second(packets)", "double")); + verifyDataRows( + result, + rows("2025-09-08 10:00:00", "server1", 1.5), // (60+120) / 2m + rows("2025-09-08 10:02:00", "server1", 0.5), // 60 / 2m + rows("2025-09-08 10:02:00", "server2", 1.5)); // 180 / 2m + } + + @Test + public void testTimechartPerSecondWithLimitAndByClause() throws IOException { + JSONObject result = + executeQuery( + "source=events_traffic | where month(@timestamp) = 9 | timechart span=2m limit=1" + + " per_second(packets) by host"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("host", "string"), + schema("per_second(packets)", "double")); + verifyDataRows( + result, + rows("2025-09-08 10:00:00", "server1", 1.5), + rows("2025-09-08 10:02:00", "server1", 0.5), + rows("2025-09-08 10:02:00", "OTHER", 1.5)); + } + + @Test + public void testTimechartPerSecondWithVariableMonthLengths() throws IOException { + JSONObject result = + executeQuery( + "source=events_traffic | where month(@timestamp) != 9 | timechart span=1M" + + " per_second(packets)"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("per_second(packets)", "double")); + verifyDataRows( + result, + rows("2025-02-01 00:00:00", 7.75), // 18748800 / 28 days' seconds + rows("2025-10-01 00:00:00", 7.0)); // 18748800 / 31 days' seconds + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index fb3cad3f9f4..80616df7bfa 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -906,7 +906,12 @@ public enum Index { "events_null", "events_null", "{\"mappings\":{\"properties\":{\"@timestamp\":{\"type\":\"date\"},\"host\":{\"type\":\"text\"},\"cpu_usage\":{\"type\":\"double\"},\"region\":{\"type\":\"keyword\"}}}}", - "src/test/resources/events_null.json"); + "src/test/resources/events_null.json"), + EVENTS_TRAFFIC( + "events_traffic", + "events_traffic", + getMappingFile("events_traffic_index_mapping.json"), + "src/test/resources/events_traffic.json"); private final String name; private final String type; diff --git a/integ-test/src/test/resources/events_traffic.json b/integ-test/src/test/resources/events_traffic.json new file mode 100644 index 00000000000..bcb3fe17f2a --- /dev/null +++ b/integ-test/src/test/resources/events_traffic.json @@ -0,0 +1,12 @@ +{"index":{"_id":"1"}} +{"@timestamp":"2025-09-08T10:00:00","packets":60,"host":"server1"} +{"index":{"_id":"2"}} +{"@timestamp":"2025-09-08T10:01:00","packets":120,"host":"server1"} +{"index":{"_id":"3"}} +{"@timestamp":"2025-09-08T10:02:00","packets":60,"host":"server1"} +{"index":{"_id":"4"}} +{"@timestamp":"2025-09-08T10:02:30","packets":180,"host":"server2"} +{"index":{"_id":"5"}} +{"@timestamp":"2025-02-15T14:00:00","packets":18748800,"host":"server1"} +{"index":{"_id":"6"}} +{"@timestamp":"2025-10-15T14:00:00","packets":18748800,"host":"server1"} diff --git a/integ-test/src/test/resources/indexDefinitions/events_traffic_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/events_traffic_index_mapping.json new file mode 100644 index 00000000000..d8fe3b4cf55 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/events_traffic_index_mapping.json @@ -0,0 +1,15 @@ +{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date" + }, + "packets": { + "type": "integer" + }, + "host": { + "type": "keyword" + } + } + } +} \ No newline at end of file diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 2e03314c5f7..e13447b68e9 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -667,6 +667,7 @@ statsFunction | takeAggFunction # takeAggFunctionCall | valuesAggFunction # valuesAggFunctionCall | percentileApproxFunction # percentileApproxFunctionCall + | perFunction # perFunctionCall | statsFunctionName LT_PRTHS functionArgs RT_PRTHS # statsFunctionCall ; @@ -703,6 +704,10 @@ percentileApproxFunction COMMA percent = numericLiteral (COMMA compression = numericLiteral)? RT_PRTHS ; +perFunction + : funcName=PER_SECOND LT_PRTHS functionArg RT_PRTHS + ; + numericLiteral : integerLiteral | decimalLiteral diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java index 20aec5b094a..f037376f5c2 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstExpressionBuilder.java @@ -21,6 +21,7 @@ import java.util.stream.Stream; import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.RuleContext; +import org.antlr.v4.runtime.tree.ParseTree; import org.opensearch.sql.ast.dsl.AstDSL; import org.opensearch.sql.ast.expression.*; import org.opensearch.sql.ast.expression.subquery.ExistsSubquery; @@ -60,6 +61,7 @@ import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.MultiFieldRelevanceFunctionContext; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.PatternMethodContext; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.PatternModeContext; +import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.PerFunctionCallContext; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.RenameFieldExpressionContext; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SingleFieldRelevanceFunctionContext; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.SortFieldContext; @@ -67,6 +69,7 @@ import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.StatsFunctionCallContext; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.StringLiteralContext; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TableSourceContext; +import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.TimechartCommandContext; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser.WcFieldExpressionContext; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParserBaseVisitor; import org.opensearch.sql.ppl.utils.ArgumentFactory; @@ -547,6 +550,18 @@ private List timestampFunctionArguments( return args; } + @Override + public UnresolvedExpression visitPerFunctionCall(PerFunctionCallContext ctx) { + ParseTree parent = ctx.getParent(); + String perFuncName = ctx.perFunction().funcName.getText(); + if (!(parent instanceof TimechartCommandContext)) { + throw new SyntaxCheckException( + perFuncName + " function can only be used within timechart command"); + } + return buildAggregateFunction( + perFuncName, Collections.singletonList(ctx.perFunction().functionArg())); + } + /** Literal and value. */ @Override public UnresolvedExpression visitIdentsAsQualifiedName(IdentsAsQualifiedNameContext ctx) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java index e4d7f912d89..d5e4f363550 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/antlr/PPLSyntaxParserTest.java @@ -99,6 +99,12 @@ public void testSearchFieldsCommandCrossClusterShouldPass() { assertNotEquals(null, tree); } + @Test + public void testPerSecondFunctionInTimechartShouldPass() { + ParseTree tree = new PPLSyntaxParser().parse("source=t | timechart per_second(a)"); + assertNotEquals(null, tree); + } + @Test public void testDynamicSourceClauseParseTreeStructure() { String query = "source=[myindex, logs, fieldIndex=\"test\", count=100]"; diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimechartTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimechartTest.java index 703850ccfff..356a2b0cede 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimechartTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLTimechartTest.java @@ -81,6 +81,20 @@ public void testTimechartBasic() { verifyPPLToSparkSQL(root, expectedSparkSql); } + @Test + public void testTimechartPerSecond() { + withPPLQuery("source=events | timechart per_second(cpu_usage)") + .expectSparkSQL( + "SELECT `@timestamp`, `DIVIDE`(`per_second(cpu_usage)` * 1.0E0, TIMESTAMPDIFF('SECOND'," + + " `@timestamp`, TIMESTAMPADD('MINUTE', 1, `@timestamp`)))" + + " `per_second(cpu_usage)`\n" + + "FROM (SELECT `SPAN`(`@timestamp`, 1, 'm') `@timestamp`, SUM(`cpu_usage`)" + + " `per_second(cpu_usage)`\n" + + "FROM `scott`.`events`\n" + + "GROUP BY `SPAN`(`@timestamp`, 1, 'm')\n" + + "ORDER BY 1 NULLS LAST) `t2`"); + } + @Test public void testTimechartWithSpan() { String ppl = "source=events | timechart span=1h count()"; diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 2ed8059b87e..a3d6f686af6 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -22,6 +22,7 @@ import static org.opensearch.sql.ast.dsl.AstDSL.defaultSortFieldArgs; import static org.opensearch.sql.ast.dsl.AstDSL.defaultStatsArgs; import static org.opensearch.sql.ast.dsl.AstDSL.describe; +import static org.opensearch.sql.ast.dsl.AstDSL.doubleLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.eval; import static org.opensearch.sql.ast.dsl.AstDSL.exprList; import static org.opensearch.sql.ast.dsl.AstDSL.field; @@ -75,6 +76,7 @@ import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.RareTopN.CommandType; +import org.opensearch.sql.ast.tree.Timechart; import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.common.setting.Settings.Key; @@ -1089,6 +1091,36 @@ public void testPatternsWithoutArguments() { ImmutableMap.of())); } + @Test + public void testTimechartWithPerSecondFunction() { + assertEqual( + "source=t | timechart per_second(a)", + eval( + new Timechart(relation("t"), alias("per_second(a)", aggregate("sum", field("a")))) + .span(span(field("@timestamp"), intLiteral(1), SpanUnit.of("m"))) + .limit(10) + .useOther(true), + let( + field("per_second(a)"), + function( + "/", + function("*", field("per_second(a)"), doubleLiteral(1.0)), + function( + "timestampdiff", + stringLiteral("SECOND"), + field("@timestamp"), + function( + "timestampadd", + stringLiteral("MINUTE"), + intLiteral(1), + field("@timestamp"))))))); + } + + @Test + public void testStatsWithPerSecondThrowsException() { + assertThrows(SyntaxCheckException.class, () -> plan("source=t | stats per_second(a)")); + } + protected void assertEqual(String query, Node expectedPlan) { Node actualPlan = plan(query); assertEquals(expectedPlan, actualPlan);