Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public void init() throws Exception {
enableCalcite();
loadIndex(Index.ACCOUNT);
loadIndex(Index.BANK);
loadIndex(Index.EVENTS_NULL);
loadIndex(Index.TIME_TEST_DATA);
}

Expand Down Expand Up @@ -865,4 +866,86 @@ public void testBinFloatingPointSpanWithStatsCount() throws IOException {
// Test floating point spans with stats aggregation - verify proper decimal formatting
verifyDataRows(result, rows(279L, "0.0-15000.5"), rows(319L, "15000.5-30001.0"));
}

@Test
public void testStatsWithBinsOnTimeField_Count() throws IOException {
// TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317
enabledOnlyWhenPushdownIsEnabled();

JSONObject result =
executeQuery("source=events_null | bin @timestamp bins=3 | stats count() by @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(result, schema("count()", null, "bigint"), schema("@timestamp", null, "string"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(result, rows(5, "2024-07-01 00:00:00"), rows(1, "2024-07-01 00:05:00"));

result =
executeQuery("source=events_null | bin @timestamp bins=6 | stats count() by @timestamp");
// auto_date_histogram will choose span=1m for bins=6
verifyDataRows(
result,
rows(1, "2024-07-01 00:00:00"),
rows(1, "2024-07-01 00:01:00"),
rows(1, "2024-07-01 00:02:00"),
rows(1, "2024-07-01 00:03:00"),
rows(1, "2024-07-01 00:04:00"),
rows(1, "2024-07-01 00:05:00"));

result =
executeQuery("source=events_null | bin @timestamp bins=100 | stats count() by @timestamp");
// auto_date_histogram will choose span=5s for bins=100, it will produce many empty buckets but
// we will filter them and left only 6 buckets.
verifyDataRows(
result,
rows(1, "2024-07-01 00:00:00"),
rows(1, "2024-07-01 00:01:00"),
rows(1, "2024-07-01 00:02:00"),
rows(1, "2024-07-01 00:03:00"),
rows(1, "2024-07-01 00:04:00"),
rows(1, "2024-07-01 00:05:00"));
}

@Test
public void testStatsWithBinsOnTimeField_Avg() throws IOException {
// TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317
enabledOnlyWhenPushdownIsEnabled();

JSONObject result =
executeQuery(
"source=events_null | bin @timestamp bins=3 | stats avg(cpu_usage) by @timestamp");
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
// https://github.com/opensearch-project/sql/issues/4317
verifySchema(
result, schema("avg(cpu_usage)", null, "double"), schema("@timestamp", null, "string"));
// auto_date_histogram will choose span=5m for bins=3
verifyDataRows(result, rows(44.62, "2024-07-01 00:00:00"), rows(50.0, "2024-07-01 00:05:00"));

result =
executeQuery(
"source=events_null | bin @timestamp bins=6 | stats avg(cpu_usage) by @timestamp");
// auto_date_histogram will choose span=1m for bins=6
verifyDataRows(
result,
rows(45.2, "2024-07-01 00:00:00"),
rows(38.7, "2024-07-01 00:01:00"),
rows(55.3, "2024-07-01 00:02:00"),
rows(42.1, "2024-07-01 00:03:00"),
rows(41.8, "2024-07-01 00:04:00"),
rows(50.0, "2024-07-01 00:05:00"));

result =
executeQuery(
"source=events_null | bin @timestamp bins=100 | stats avg(cpu_usage) by @timestamp");
// auto_date_histogram will choose span=5s for bins=100, it will produce many empty buckets but
// we will filter them and left only 6 buckets.
verifyDataRows(
result,
rows(45.2, "2024-07-01 00:00:00"),
rows(38.7, "2024-07-01 00:01:00"),
rows(55.3, "2024-07-01 00:02:00"),
rows(42.1, "2024-07-01 00:03:00"),
rows(41.8, "2024-07-01 00:04:00"),
rows(50.0, "2024-07-01 00:05:00"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,23 @@ public void testExplainBinWithBins() throws IOException {
explainQueryToString("source=opensearch-sql_test_index_account | bin age bins=3 | head 5"));
}

@Test
public void testExplainStatsWithBinsOnTimeField() throws IOException {
// TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317
enabledOnlyWhenPushdownIsEnabled();
String expected = loadExpectedPlan("explain_stats_bins_on_time.yaml");
assertYamlEqualsJsonIgnoreId(
expected,
explainQueryToString(
"source=events | bin @timestamp bins=3 | stats count() by @timestamp"));

expected = loadExpectedPlan("explain_stats_bins_on_time2.yaml");
assertYamlEqualsJsonIgnoreId(
expected,
explainQueryToString(
"source=events | bin @timestamp bins=3 | stats avg(cpu_usage) by @timestamp"));
}

@Test
public void testExplainBinWithSpan() throws IOException {
String expected = loadExpectedPlan("explain_bin_span.json");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(count()=[$1], @timestamp=[$0])
LogicalAggregate(group=[{0}], count()=[COUNT()])
LogicalProject(@timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())])
CalciteLogicalIndexScan(table=[[OpenSearch, events]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[0], expr#3=[>($t1, $t2)], count()=[$t1], @timestamp=[$t0], $condition=[$t3])
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null},"aggregations":{"count()":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
calcite:
logical: |
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(avg(cpu_usage)=[$1], @timestamp=[$0])
LogicalAggregate(group=[{0}], avg(cpu_usage)=[AVG($1)])
LogicalProject(@timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())], cpu_usage=[$7])
CalciteLogicalIndexScan(table=[[OpenSearch, events]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[IS NOT NULL($t1)], avg(cpu_usage)=[$t1], @timestamp=[$t0], $condition=[$t2])
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},avg(cpu_usage)=AVG($1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null},"aggregations":{"avg(cpu_usage)":{"avg":{"field":"cpu_usage"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.opensearch.sql.data.type.ExprCoreType.STRUCT;
import static org.opensearch.sql.data.type.ExprCoreType.TIME;
import static org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP;
import static org.opensearch.sql.utils.DateTimeFormatters.DATE_TIME_FORMATTER;
import static org.opensearch.sql.utils.DateTimeFormatters.STRICT_HOUR_MINUTE_SECOND_FORMATTER;
import static org.opensearch.sql.utils.DateTimeFormatters.STRICT_YEAR_MONTH_DAY_FORMATTER;
import static org.opensearch.sql.utils.DateTimeUtils.UTC_ZONE_ID;
Expand All @@ -30,6 +29,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
Expand Down Expand Up @@ -333,6 +333,11 @@ private static ExprValue createOpenSearchDateType(Content value, ExprType type)
return parseDateTimeString(value.stringValue(), dt);
}

if (value.objectValue() instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (ZonedDateTime) value.objectValue();
return new ExprTimestampValue(zonedDateTime.withZoneSameLocal(ZoneOffset.UTC).toInstant());
}

return new ExprTimestampValue((Instant) value.objectValue());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@
*/
package org.opensearch.sql.opensearch.planner.physical;

import static org.opensearch.sql.expression.function.PPLBuiltinOperators.WIDTH_BUCKET;

import java.util.List;
import java.util.function.Predicate;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.AbstractRelNode;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.SqlKind;
import org.immutables.value.Value;
import org.opensearch.sql.calcite.type.ExprSqlType;
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory.ExprUDT;
import org.opensearch.sql.opensearch.storage.scan.CalciteLogicalIndexScan;

/** Planner rule that push a {@link LogicalAggregate} down to {@link CalciteLogicalIndexScan} */
Expand All @@ -29,6 +37,13 @@ public void onMatch(RelOptRuleCall call) {
final LogicalAggregate aggregate = call.rel(0);
final LogicalProject project = call.rel(1);
final CalciteLogicalIndexScan scan = call.rel(2);

// For multiple group-by, we currently have to use CompositeAggregationBuilder while it
// doesn't support auto_date_histogram referring to bin command with parameter bins
if (aggregate.getGroupSet().length() > 1 && Config.containsWidthBucketFuncOnDate(project)) {
return;
}

apply(call, aggregate, project, scan);
} else if (call.rels.length == 2) {
// case of count() without group-by
Expand All @@ -48,9 +63,9 @@ protected void apply(
LogicalAggregate aggregate,
LogicalProject project,
CalciteLogicalIndexScan scan) {
CalciteLogicalIndexScan newScan = scan.pushDownAggregate(aggregate, project);
if (newScan != null) {
call.transformTo(newScan);
AbstractRelNode newRelNode = scan.pushDownAggregate(aggregate, project);
if (newRelNode != null) {
call.transformTo(newRelNode);
}
}

Expand All @@ -68,9 +83,13 @@ public interface Config extends RelRule.Config {
b1 ->
b1.operand(LogicalProject.class)
.predicate(
// Don't push down aggregate on window function
// Support push down aggregate with project that:
// 1. No RexOver and no duplicate projection
// 2. Contains width_bucket function on date field referring
// to bin command with parameter bins
Predicate.not(OpenSearchIndexScanRule::containsRexOver)
.and(OpenSearchIndexScanRule::distinctProjectList))
.and(OpenSearchIndexScanRule::distinctProjectList)
.or(Config::containsWidthBucketFuncOnDate))
.oneInput(
b2 ->
b2.operand(CalciteLogicalIndexScan.class)
Expand Down Expand Up @@ -108,5 +127,20 @@ public interface Config extends RelRule.Config {
default OpenSearchAggregateIndexScanRule toRule() {
return new OpenSearchAggregateIndexScanRule(this);
}

static boolean containsWidthBucketFuncOnDate(LogicalProject project) {
return project.getProjects().stream()
.anyMatch(
expr ->
expr instanceof RexCall
&& ((RexCall)expr).getOperator().equals(WIDTH_BUCKET)
&& dateRelatedType(((RexCall)expr).getOperands().get(0).getType()));
}

static boolean dateRelatedType(RelDataType type) {
return type instanceof ExprSqlType
&& List.of(ExprUDT.EXPR_DATE, ExprUDT.EXPR_TIME, ExprUDT.EXPR_TIMESTAMP)
.contains(((ExprSqlType)type).getUdt());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.opensearch.sql.data.type.ExprCoreType.DATE;
import static org.opensearch.sql.data.type.ExprCoreType.TIME;
import static org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP;
import static org.opensearch.sql.expression.function.PPLBuiltinOperators.WIDTH_BUCKET;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.opensearch.search.aggregations.BucketOrder;
import org.opensearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.opensearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.metrics.ExtendedStats;
Expand All @@ -77,6 +79,7 @@
import org.opensearch.sql.opensearch.data.type.OpenSearchDataType;
import org.opensearch.sql.opensearch.request.PredicateAnalyzer.NamedFieldExpression;
import org.opensearch.sql.opensearch.response.agg.ArgMaxMinParser;
import org.opensearch.sql.opensearch.response.agg.BucketAggregationParser;
import org.opensearch.sql.opensearch.response.agg.CompositeAggregationParser;
import org.opensearch.sql.opensearch.response.agg.CountAsTotalHitsParser;
import org.opensearch.sql.opensearch.response.agg.MetricParser;
Expand Down Expand Up @@ -235,6 +238,19 @@ public static Pair<List<AggregationBuilder>, OpenSearchAggregationResponseParser
ImmutableList.copyOf(metricBuilder.getAggregatorFactories()),
new NoBucketAggregationParser(metricParserList));
}
} else if (aggregate.getGroupSet().length() == 1
&& isAutoDateSpan(project.getProjects().get(groupList.get(0)))) {
RexCall rexCall = (RexCall) project.getProjects().get(groupList.get(0));
String bucketName = project.getRowType().getFieldList().get(groupList.get(0)).getName();
RexInputRef rexInputRef = (RexInputRef) rexCall.getOperands().get(0);
RexLiteral valueLiteral = (RexLiteral) rexCall.getOperands().get(1);
ValuesSourceAggregationBuilder<?> bucketBuilder =
new AutoDateHistogramAggregationBuilder(bucketName)
.field(helper.inferNamedField(rexInputRef).getRootName())
.setNumBuckets(requireNonNull(valueLiteral.getValueAs(Integer.class)));
return Pair.of(
Collections.singletonList(bucketBuilder.subAggregations(metricBuilder)),
new BucketAggregationParser(metricParserList));
} else {
List<CompositeValuesSourceBuilder<?>> buckets =
createCompositeBuckets(groupList, project, helper);
Expand Down Expand Up @@ -296,7 +312,7 @@ private static Pair<Builder, List<MetricParser>> processAggregateCalls(
List<String> aggFieldNames,
List<AggregateCall> aggCalls,
Project project,
AggregateBuilderHelper helper)
AggregateAnalyzer.AggregateBuilderHelper helper)
throws PredicateAnalyzer.ExpressionNotAnalyzableException {
Builder metricBuilder = new AggregatorFactories.Builder();
List<MetricParser> metricParserList = new ArrayList<>();
Expand Down Expand Up @@ -491,7 +507,7 @@ private static Pair<AggregationBuilder, MetricParser> createRegularAggregation(
String.format("Unsupported push-down aggregator %s", aggCall.getAggregation()));
}
default:
throw new AggregateAnalyzerException(
throw new AggregateAnalyzer.AggregateAnalyzerException(
String.format("unsupported aggregator %s", aggCall.getAggregation()));
}
}
Expand Down Expand Up @@ -521,6 +537,12 @@ private static List<CompositeValuesSourceBuilder<?>> createCompositeBuckets(
return resultBuilder.build();
}

private static boolean isAutoDateSpan(RexNode rex) {
return rex instanceof RexCall
&& ((RexCall)rex).getKind() == SqlKind.OTHER_FUNCTION
&& ((RexCall)rex).getOperator().equals(WIDTH_BUCKET);
}

private static ValuesSourceAggregationBuilder<?> createBucket(
Integer groupIndex, Project project, AggregateBuilderHelper helper) {
RexNode rex = project.getProjects().get(groupIndex);
Expand All @@ -543,7 +565,7 @@ private static ValuesSourceAggregationBuilder<?> createBucket(
}

private static CompositeValuesSourceBuilder<?> createCompositeBucket(
Integer groupIndex, Project project, AggregateBuilderHelper helper) {
Integer groupIndex, Project project, AggregateAnalyzer.AggregateBuilderHelper helper) {
RexNode rex = project.getProjects().get(groupIndex);
String bucketName = project.getRowType().getFieldList().get(groupIndex).getName();
if (rex instanceof RexCall
Expand All @@ -560,6 +582,10 @@ private static CompositeValuesSourceBuilder<?> createCompositeBucket(
SpanUnit.of(((RexLiteral)((RexCall) rex).getOperands().get(2)).getValueAs(String.class)),
MissingOrder.FIRST,
helper.bucketNullable);
} else if (isAutoDateSpan(rex)) {
// Defense check. We've already prevented this case in OpenSearchAggregateIndexScanRule.
throw new UnsupportedOperationException(
"auto_date_histogram is not supported in composite agg.");
} else {
return createTermsSourceBuilder(bucketName, rex, helper);
}
Expand Down
Loading
Loading