Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;

/** Class of static methods to create specific node instances. */
@UtilityClass
Expand Down Expand Up @@ -491,15 +492,19 @@ public static Span spanFromSpanLengthLiteral(
UnresolvedExpression field, Literal spanLengthLiteral) {
if (spanLengthLiteral.getType() == DataType.STRING) {
String spanText = spanLengthLiteral.getValue().toString();
String valueStr = spanText.replaceAll("[^0-9]", "");
String unitStr = spanText.replaceAll("[0-9]", "");
String valueStr = spanText.replaceAll("[^0-9-]", "");
String unitStr = spanText.replaceAll("[0-9-]", "");

if (valueStr.isEmpty()) {
// No numeric value found, use the literal as-is
return new Span(field, spanLengthLiteral, SpanUnit.NONE);
} else {
// Parse numeric value and unit
Integer value = Integer.parseInt(valueStr);
if (value <= 0) {
throw new IllegalArgumentException(
String.format("Zero or negative time interval not supported: %s", spanText));
}
SpanUnit unit = unitStr.isEmpty() ? SpanUnit.NONE : SpanUnit.of(unitStr);
return span(field, intLiteral(value), unit);
}
Expand Down Expand Up @@ -713,4 +718,9 @@ public static Bin bin(UnresolvedExpression field, Argument... arguments) {
return DefaultBin.builder().field(field).alias(alias).build();
}
}

/** Get a reference to the implicit timestamp field {@code @timestamp} */
public static Field referImplicitTimestampField() {
return AstDSL.field(OpenSearchConstants.IMPLICIT_FIELD_TIMESTAMP);
}
}
21 changes: 18 additions & 3 deletions docs/user/ppl/cmd/stats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ stats [bucket_nullable=bool] <aggregation>... [by-clause]

* span-expression: optional, at most one.

* Syntax: span(field_expr, interval_expr)
* Description: The unit of the interval expression is the natural unit by default. **If the field is a date/time type field, the aggregation results always ignore null bucket**. And the interval is in date/time units, you will need to specify the unit in the interval expression. For example, to split the field ``age`` into buckets by 10 years, it looks like ``span(age, 10)``. And here is another example of time span, the span to split a ``timestamp`` field into hourly intervals, it looks like ``span(timestamp, 1h)``.
* Syntax: span([field_expr,] interval_expr)
* Description: The unit of the interval expression is the natural unit by default. If ``field_expr`` is omitted, span will use the implicit ``@timestamp`` field. An error will be thrown if this field doesn't exist. **If the field is a date/time type field, the aggregation results always ignore null bucket**. And the interval is in date/time units, you will need to specify the unit in the interval expression. For example, to split the field ``age`` into buckets by 10 years, it looks like ``span(age, 10)``. And here is another example of time span, the span to split a ``timestamp`` field into hourly intervals, it looks like ``span(timestamp, 1h)``.
* Available time unit:

+----------------------------+
Expand Down Expand Up @@ -580,7 +580,7 @@ Description

Version: 3.3.0 (Calcite engine only)

Usage: LIST(expr). Collects all values from the specified expression into an array. Values are converted to strings, nulls are filtered, and duplicates are preserved.
Usage: LIST(expr). Collects all values from the specified expression into an array. Values are converted to strings, nulls are filtered, and duplicates are preserved.
The function returns up to 100 values with no guaranteed ordering.

* expr: The field expression to collect values from.
Expand Down Expand Up @@ -977,3 +977,18 @@ PPL query::
| 1 | 2025-01-01 | 2 |
+-----+------------+--------+


Example 18: Calculate the count by the implicit @timestamp field
================================================================

This example demonstrates that if you omit the field parameter in the span function, it will automatically use the implicit ``@timestamp`` field.

PPL query::

PPL> source=big5 | stats count() by span(1month)
fetched rows / total rows = 1/1
+---------+---------------------+
| count() | span(1month) |
|---------+---------------------|
| 1 | 2023-01-01 00:00:00 |
+---------+---------------------+
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
import static org.opensearch.sql.util.MatcherUtils.verifyErrorMessageContains;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;

Expand All @@ -25,6 +26,8 @@
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.ppl.PPLIntegTestCase;

public class CalcitePPLAggregationIT extends PPLIntegTestCase {
Expand All @@ -41,6 +44,7 @@ public void init() throws Exception {
loadIndex(Index.CALCS);
loadIndex(Index.DATE_FORMATS);
loadIndex(Index.DATA_TYPE_NUMERIC);
loadIndex(Index.BIG5);
loadIndex(Index.LOGS);
loadIndex(Index.TELEMETRY);
loadIndex(Index.TIME_TEST_DATA);
Expand Down Expand Up @@ -729,6 +733,23 @@ public void testCountBySpanForCustomFormats() throws IOException {
verifyDataRows(actual, rows(1, "00:00:00"), rows(1, "12:00:00"));
}

// Only available in v3 with Calcite
@Test
public void testSpanByImplicitTimestamp() throws IOException {
JSONObject result = executeQuery("source=big5 | stats count() by span(1d) as span");
verifySchema(result, schema("count()", "bigint"), schema("span", "timestamp"));
verifyDataRows(result, rows(1, "2023-01-02 00:00:00"));

Throwable t =
assertThrowsWithReplace(
SemanticCheckException.class,
() ->
executeQuery(
StringUtils.format(
"source=%s | stats count() by span(5m)", TEST_INDEX_DATE_FORMATS)));
verifyErrorMessageContains(t, "Field [@timestamp] not found");
}

@Test
public void testCountDistinct() throws IOException {
JSONObject actual =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
setup:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : true
- do:
indices.create:
index: test_timechart_span_validation
body:
mappings:
properties:
"@timestamp":
type: date_nanos
packets:
type: long
- do:
bulk:
index: test_timechart_span_validation
refresh: true
body:
- '{"index": {}}'
- '{"@timestamp": "2024-01-15T10:30:04.567890123Z", "packets": 100}'
- '{"index": {}}'
- '{"@timestamp": "2024-01-15T10:31:04.567890123Z", "packets": 150}'
- '{"index": {}}'
- '{"@timestamp": "2024-01-15T10:32:04.567890123Z", "packets": 120}'

---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false

---
"timechart with zero span should return validation error":
- skip:
features:
- headers
- allowed_warnings
- do:
catch: bad_request
headers:
Content-Type: 'application/json'
ppl:
body:
query: source=test_timechart_span_validation | timechart span=0m per_second(packets)
- match: {"$body": "/Zero\\s+or\\s+negative\\s+time\\s+interval\\s+not\\s+supported/"}
8 changes: 2 additions & 6 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,8 @@ timechartCommand
;

timechartParameter
: (spanClause | SPAN EQUAL spanLiteral)
| timechartArg
;

timechartArg
: LIMIT EQUAL integerLiteral
| SPAN EQUAL spanLiteral
| USEOTHER EQUAL (booleanLiteral | ident)
;

Expand Down Expand Up @@ -615,7 +611,7 @@ bySpanClause
;

spanClause
: SPAN LT_PRTHS fieldExpression COMMA value = spanLiteral RT_PRTHS
: SPAN LT_PRTHS (fieldExpression COMMA)? value = spanLiteral RT_PRTHS
;

sortbyClause
Expand Down
39 changes: 10 additions & 29 deletions ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -600,39 +600,20 @@ public UnresolvedPlan visitReverseCommand(OpenSearchPPLParser.ReverseCommandCont
@Override
public UnresolvedPlan visitTimechartCommand(OpenSearchPPLParser.TimechartCommandContext ctx) {
UnresolvedExpression binExpression =
AstDSL.span(AstDSL.field("@timestamp"), AstDSL.intLiteral(1), SpanUnit.of("m"));
AstDSL.span(AstDSL.referImplicitTimestampField(), AstDSL.intLiteral(1), SpanUnit.m);
Integer limit = 10;
Boolean useOther = true;
// Process timechart parameters
for (OpenSearchPPLParser.TimechartParameterContext paramCtx : ctx.timechartParameter()) {
if (paramCtx.spanClause() != null) {
binExpression = internalVisitExpression(paramCtx.spanClause());
} else if (paramCtx.spanLiteral() != null) {
Literal literal = (Literal) internalVisitExpression(paramCtx.spanLiteral());
binExpression = AstDSL.spanFromSpanLengthLiteral(AstDSL.field("@timestamp"), literal);
} else if (paramCtx.timechartArg() != null) {
OpenSearchPPLParser.TimechartArgContext argCtx = paramCtx.timechartArg();
if (argCtx.LIMIT() != null && argCtx.integerLiteral() != null) {
limit = Integer.parseInt(argCtx.integerLiteral().getText());
if (limit < 0) {
throw new IllegalArgumentException("Limit must be a non-negative number");
}
} else if (argCtx.USEOTHER() != null) {
if (argCtx.booleanLiteral() != null) {
useOther = Boolean.parseBoolean(argCtx.booleanLiteral().getText());
} else if (argCtx.ident() != null) {
String useOtherValue = argCtx.ident().getText().toLowerCase();
if ("true".equals(useOtherValue) || "t".equals(useOtherValue)) {
useOther = true;
} else if ("false".equals(useOtherValue) || "f".equals(useOtherValue)) {
useOther = false;
} else {
throw new IllegalArgumentException(
"Invalid useOther value: "
+ argCtx.ident().getText()
+ ". Expected true/false or t/f");
}
}
UnresolvedExpression param = internalVisitExpression(paramCtx);
if (param instanceof Span) {
binExpression = param;
} else if (param instanceof Literal literal) {
if (DataType.BOOLEAN.equals(literal.getType())) {
useOther = (Boolean) literal.getValue();
} else if (DataType.INTEGER.equals(literal.getType())
|| DataType.LONG.equals(literal.getType())) {
limit = (Integer) literal.getValue();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ public UnresolvedExpression visitSpanClause(SpanClauseContext ctx) {
if (ctx.fieldExpression() != null) {
fieldExpression = visit(ctx.fieldExpression());
} else {
fieldExpression = AstDSL.field("@timestamp");
fieldExpression = AstDSL.referImplicitTimestampField();
}
Literal literal = (Literal) visit(ctx.value);
return AstDSL.spanFromSpanLengthLiteral(fieldExpression, literal);
Expand Down Expand Up @@ -934,6 +934,47 @@ public UnresolvedExpression visitTimeModifierValue(
return AstDSL.stringLiteral(osDateMathExpression);
}

@Override
public UnresolvedExpression visitTimechartParameter(
OpenSearchPPLParser.TimechartParameterContext ctx) {
UnresolvedExpression timechartParameter;
if (ctx.SPAN() != null) {
// Convert span=1h to span(@timestamp, 1h)
Literal spanLiteral = (Literal) visit(ctx.spanLiteral());
timechartParameter =
AstDSL.spanFromSpanLengthLiteral(AstDSL.referImplicitTimestampField(), spanLiteral);
} else if (ctx.LIMIT() != null) {
Literal limit = (Literal) visit(ctx.integerLiteral());
if ((Integer) limit.getValue() < 0) {
throw new IllegalArgumentException("Limit must be a non-negative number");
}
timechartParameter = limit;
} else if (ctx.USEOTHER() != null) {
UnresolvedExpression useOther;
if (ctx.booleanLiteral() != null) {
useOther = visit(ctx.booleanLiteral());
} else if (ctx.ident() != null) {
QualifiedName ident = visitIdentifiers(List.of(ctx.ident()));
String useOtherValue = ident.toString();
if ("true".equalsIgnoreCase(useOtherValue) || "t".equalsIgnoreCase(useOtherValue)) {
useOther = AstDSL.booleanLiteral(true);
} else if ("false".equalsIgnoreCase(useOtherValue) || "f".equalsIgnoreCase(useOtherValue)) {
useOther = AstDSL.booleanLiteral(false);
} else {
throw new IllegalArgumentException(
"Invalid useOther value: " + ctx.ident().getText() + ". Expected true/false or t/f");
}
} else {
throw new IllegalArgumentException("value for useOther must be a boolean or identifier");
}
timechartParameter = useOther;
} else {
throw new IllegalArgumentException(
String.format("A parameter of timechart must be a span, limit or useOther, got %s", ctx));
}
return timechartParameter;
}

/**
* Process time range expressions (EARLIEST='value' or LATEST='value') It creates a Comparison
* filter like @timestamp >= timeModifierValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.ppl.calcite;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;

import com.google.common.collect.ImmutableList;
import java.util.List;
Expand Down Expand Up @@ -342,6 +343,13 @@ public void testTimechartWithUseOtherBeforeLimit() {
assertNotNull(plan);
}

@Test
public void testTimechartUsingZeroSpanShouldThrow() {
String ppl = "source=events | timechart span=0h limit=5 count() by host";
Throwable t = assertThrows(IllegalArgumentException.class, () -> parsePPL(ppl));
verifyErrorMessageContains(t, "Zero or negative time interval not supported: 0h");
}

private UnresolvedPlan parsePPL(String query) {
PPLSyntaxParser parser = new PPLSyntaxParser();
AstBuilder astBuilder = new AstBuilder(query);
Expand Down
Loading
Loading