diff --git a/docs/changelog/140475.yaml b/docs/changelog/140475.yaml new file mode 100644 index 0000000000000..183c8ab3379a0 --- /dev/null +++ b/docs/changelog/140475.yaml @@ -0,0 +1,5 @@ +pr: 140475 +summary: Partition time-series source +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java index 0181882515f9c..4099ad74c0b0d 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java @@ -52,7 +52,7 @@ @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class MultiClusterTimeSeriesIT extends ESRestTestCase { - static final List REQUIRED_CAPABILITIES = List.of("ts_command_v0"); + static final List REQUIRED_CAPABILITIES = List.of("ts_command_v0", "rate_fix_resets_multiple_segments"); static ElasticsearchCluster remoteCluster = Clusters.remoteCluster(); static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 9c59e1167c7ca..f0af67c6a52a6 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -588,6 +588,7 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { public static final PlannerSettings TEST_PLANNER_SETTINGS = new PlannerSettings( DataPartitioning.AUTO, ByteSizeValue.ofMb(1), + ByteSizeValue.ofMb(2), 10_000, ByteSizeValue.ofMb(1) ); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index de72bebba001e..dce200ba24810 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -24,6 +24,7 @@ max_bytes:long | cluster: keyword maxRate required_capability: ts_command_v0 required_capability: double_quotes_source_enclosing +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max=max(rate(network.total_bytes_in)) | EVAL max=ROUND(max, 6) | KEEP max; max: double @@ -33,6 +34,7 @@ max: double maxRatePerMinute required_capability: ts_command_v0 required_capability: double_quotes_source_enclosing +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max=max(60 * rate(network.total_bytes_in)) | EVAL max=ROUND(max, 6) | KEEP max; max: double @@ -41,6 +43,7 @@ max: double maxCost required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_cost=max(rate(network.total_cost)) | EVAL max_cost=ROUND(max_cost, 6) | KEEP max_cost; max_cost: double @@ -49,6 +52,7 @@ max_cost: double maxRateAndBytes required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(60 * rate(network.total_bytes_in)), max_bytes=max(network.bytes_in) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_bytes; max_rate: double | max_bytes: long @@ -57,6 +61,7 @@ max_rate: double | max_bytes: long maxRateAndBytesExplicit required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(60 * rate(network.total_bytes_in)), max_bytes=max(last_over_time(network.bytes_in)) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_bytes; max_rate: double | max_bytes: long @@ -65,6 +70,7 @@ max_rate: double | max_bytes: long maxRateAndMarkupBytes required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_markup=max(network.bytes_in * 1.05) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_markup; max_rate: double | max_markup: double @@ -73,6 +79,7 @@ max_rate: double | max_markup: double maxRateAndMarkupBytesExplicit required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_bytes_in = max(last_over_time(network.bytes_in) * 1.05) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_bytes_in; max_rate: double | max_bytes_in: double @@ -81,6 +88,7 @@ max_rate: double | max_bytes_in: double maxRateAndLastBytesIn required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_bytes_in = max(last_over_time(network.bytes_in * 1.05)) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_bytes_in; max_rate: double | max_bytes_in: double @@ -89,6 +97,7 @@ max_rate: double | max_bytes_in: double maxRateAndBytesAndCost required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_bytes=max(max_over_time(network.bytes_in)), max_cost=max(rate(network.total_cost)) | EVAL max_rate=ROUND(max_rate, 6), max_cost=ROUND(max_cost, 6) | KEEP max_rate, max_bytes, max_cost; max_rate:double | max_bytes:long | max_cost:double @@ -97,6 +106,7 @@ max_rate:double | max_bytes:long | max_cost:double sumRate required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS bytes=sum(rate(network.total_bytes_in)), sum_cost=sum(rate(network.total_cost)) BY cluster | EVAL bytes=ROUND(bytes, 6), sum_cost=ROUND(sum_cost, 6) | KEEP bytes, sum_cost, cluster | SORT cluster; bytes: double | sum_cost: double | cluster: keyword @@ -108,6 +118,7 @@ bytes: double | sum_cost: double | cluster: keyword oneRateWithBucket required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 +required_capability: rate_fix_resets_multiple_segments // tag::rate[] TS k8s @@ -125,8 +136,8 @@ max_rate: double | time_bucket:date oneRateWithTBucket required_capability: ts_command_v0 -required_capability: tbucket required_capability: rate_with_interpolation_v2 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)) BY time_bucket = tbucket(5minute) @@ -141,6 +152,7 @@ max_rate: double | time_bucket:date oneRateWithPromql required_capability: promql_pre_tech_preview_v7 required_capability: rate_with_interpolation_v2 +required_capability: rate_fix_resets_multiple_segments PROMQL index=k8s step=5m max(rate(network.total_bytes_in[5m])) | SORT step DESC | LIMIT 2; @@ -153,7 +165,8 @@ max(rate(network.total_bytes_in[5m])):double | step:datetime oneRateWithSingleTBucket required_capability: ts_command_v0 -required_capability: tbucket +required_capability: rate_fix_resets_multiple_segments + TS k8s | STATS max_rate=max(rate(network.total_bytes_in)) BY time_bucket = tbucket(1h) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, time_bucket; @@ -164,6 +177,7 @@ max_rate: double | time_bucket:date oneRateWithSingleStepPromql required_capability: promql_pre_tech_preview_v7 +required_capability: rate_fix_resets_multiple_segments PROMQL index=k8s step=1h max(rate(network.total_bytes_in[1h])); max(rate(network.total_bytes_in[1h])):double | step:datetime @@ -174,6 +188,7 @@ max(rate(network.total_bytes_in[1h])):double | step:datetime twoRatesWithBucket required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), sum_rate=sum(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute) | EVAL max_rate=ROUND(max_rate, 6), sum_rate=ROUND(sum_rate, 6) | KEEP max_rate, sum_rate, time_bucket | SORT time_bucket DESC | LIMIT 3; @@ -185,8 +200,9 @@ max_rate:double | sum_rate:double | time_bucket:datetime twoRatesWithTBucket required_capability: ts_command_v0 -required_capability: tbucket required_capability: rate_with_interpolation_v2 +required_capability: rate_fix_resets_multiple_segments + TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), sum_rate=sum(rate(network.total_bytes_in)) BY time_bucket = tbucket(5minute) | EVAL max_rate=ROUND(max_rate, 6), sum_rate=ROUND(sum_rate, 6) | KEEP max_rate, sum_rate, time_bucket | SORT time_bucket DESC | LIMIT 3; max_rate:double | sum_rate:double | time_bucket:datetime @@ -198,6 +214,8 @@ max_rate:double | sum_rate:double | time_bucket:datetime oneRateWithBucketAndCluster required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 +required_capability: rate_fix_resets_multiple_segments + TS k8s | STATS max_rate=max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, time_bucket, cluster | SORT time_bucket DESC, cluster | LIMIT 6; max_rate:double | time_bucket:datetime | cluster:keyword @@ -212,6 +230,7 @@ max_rate:double | time_bucket:datetime | cluster:keyword BytesAndCostByBucketAndCluster required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_cost=max(max_over_time(network.cost)) BY time_bucket = bucket(@timestamp,5minute), cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_cost, time_bucket, cluster | SORT time_bucket DESC, cluster | LIMIT 6; @@ -228,6 +247,7 @@ max_rate:double | max_cost:double | time_bucket:date | cluster: keyword oneRateWithBucketAndClusterThenFilter required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 +required_capability: rate_fix_resets_multiple_segments TS k8s | WHERE cluster=="prod" | STATS max_rate=max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, time_bucket, cluster | SORT time_bucket DESC | LIMIT 3; @@ -240,6 +260,8 @@ max_rate:double | time_bucket:datetime | cluster:keyword maxRateWithInlineFilter required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments + TS k8s | STATS max_rate = max(rate(network.total_bytes_in)) WHERE cluster=="prod" BY cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, cluster | SORT cluster; max_rate:double | cluster:keyword @@ -250,6 +272,8 @@ null | staging maxRateWithPreFilter required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments + TS k8s | WHERE cluster=="prod" | STATS max_rate = max(rate(network.total_bytes_in)) BY cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, cluster | SORT cluster; max_rate:double | cluster:keyword @@ -259,6 +283,7 @@ max_rate:double | cluster:keyword notEnoughSamples required_capability: ts_command_v0 required_capability: rate_with_interpolation +required_capability: rate_fix_resets_multiple_segments TS k8s | WHERE @timestamp <= "2024-05-10T00:06:14.000Z" | STATS max_rate=max(rate(network.total_bytes_in)) BY pod, time_bucket = bucket(@timestamp,1minute) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, pod, time_bucket | SORT pod, time_bucket DESC | LIMIT 10; @@ -516,6 +541,7 @@ distincts:long | distincts_imprecise:long | cluster:keyword | time_bucket:dateti two_rates required_capability: ts_command_v0 required_capability: rate_with_interpolation +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS cost_per_mb=max(rate(network.total_bytes_in) / 1024 * 1024 * rate(network.total_cost)) BY cluster, time_bucket = bucket(@timestamp,5minute) | EVAL cost_per_mb=ROUND(cost_per_mb, 6) | KEEP cost_per_mb, cluster, time_bucket | SORT cost_per_mb DESC, cluster, time_bucket DESC | LIMIT 5; @@ -716,6 +742,7 @@ sd:double | var:double | sd_squared:double | cluster:keyword | time_bucket:datet Max of Rate with Bucket required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(@timestamp, 1hour) @@ -727,6 +754,7 @@ maxRate:double | tbucket:datetime Max of Rate with Bucket, rename _tsid required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s METADATA _tsid | RENAME _tsid AS newTsid @@ -741,6 +769,7 @@ maxRate:double | tbucket:datetime Max of Rate with Bucket, drop _tsid required_capability: ts_command_v0 +required_capability: rate_fix_resets_multiple_segments TS k8s METADATA _tsid | DROP _tsid @@ -943,6 +972,7 @@ max:double | inc:double | time_bucket:date_nanos | cluster:keyword dateNanosOneRateIncreaseWithBucketAndClusterThenFilter required_capability: ts_command_v0 required_capability: ts_rate_datenanos_2 +required_capability: rate_fix_resets_multiple_segments TS datenanos-k8s | WHERE cluster=="prod" @@ -991,6 +1021,7 @@ required_capability: ts_command_v0 required_capability: ts_linreg_derivative required_capability: ts_deriv_datenanos required_capability: rate_with_interpolation_v2 +required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_deriv = max(deriv(to_long(network.total_bytes_in))), max_rate = max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index a92b209b9f825..2fec5bef3f907 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1837,6 +1837,11 @@ public enum Cap { */ CONDITIONAL_BLOCK_LOADER_FOR_TEXT_FIELDS, + /** + * Fixes reset calculation in rates where partitioning data into multiple slices can lead to incorrect results. + */ + RATE_FIX_RESETS_MULTIPLE_SEGMENTS, + // Last capability should still have a comma for fewer merge conflicts when adding new ones :) // This comment prevents the semicolon from being on the previous capability when Spotless formats the file. ; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java index 6da74fdd564bb..38854292dd7a4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java @@ -7,16 +7,20 @@ package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeMap; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.expression.predicate.operator.comparison.BinaryComparison; import org.elasticsearch.xpack.esql.core.querydsl.query.Query; import org.elasticsearch.xpack.esql.core.util.CollectionUtils; +import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.core.util.Queries; +import org.elasticsearch.xpack.esql.expression.function.TimestampAware; import org.elasticsearch.xpack.esql.expression.predicate.Predicates; import org.elasticsearch.xpack.esql.expression.predicate.Range; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; @@ -25,11 +29,12 @@ import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThanOrEqual; import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; -import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; +import org.elasticsearch.xpack.esql.rule.ParameterizedRule; import java.util.ArrayList; import java.util.List; @@ -39,20 +44,27 @@ import static org.elasticsearch.xpack.esql.expression.predicate.Predicates.splitAnd; import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; -public class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule { +public class PushFiltersToSource extends ParameterizedRule { @Override - protected PhysicalPlan rule(FilterExec filterExec, LocalPhysicalOptimizerContext ctx) { - PhysicalPlan plan = filterExec; - if (filterExec.child() instanceof EsQueryExec queryExec) { - plan = planFilterExec(filterExec, queryExec, ctx); - } else if (filterExec.child() instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec queryExec) { - plan = planFilterExec(filterExec, evalExec, queryExec, ctx); - } - return plan; + public PhysicalPlan apply(PhysicalPlan rootPlan, LocalPhysicalOptimizerContext ctx) { + return rootPlan.transformDown(FilterExec.class, filterExec -> { + if (filterExec.child() instanceof EsQueryExec queryExec) { + return planFilterExec(filterExec, queryExec, ctx, timestampFieldForTimeSeries(rootPlan, queryExec)); + } else if (filterExec.child() instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec queryExec) { + return planFilterExec(filterExec, evalExec, queryExec, ctx, timestampFieldForTimeSeries(rootPlan, queryExec)); + } else { + return filterExec; + } + }); } - private static PhysicalPlan planFilterExec(FilterExec filterExec, EsQueryExec queryExec, LocalPhysicalOptimizerContext ctx) { + private static PhysicalPlan planFilterExec( + FilterExec filterExec, + EsQueryExec queryExec, + LocalPhysicalOptimizerContext ctx, + Expression timestampField + ) { LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()); List pushable = new ArrayList<>(); List nonPushable = new ArrayList<>(); @@ -66,14 +78,15 @@ private static PhysicalPlan planFilterExec(FilterExec filterExec, EsQueryExec qu } } } - return rewrite(pushdownPredicates, filterExec, queryExec, pushable, nonPushable, List.of()); + return rewrite(ctx, pushdownPredicates, filterExec, queryExec, pushable, nonPushable, List.of(), timestampField); } private static PhysicalPlan planFilterExec( FilterExec filterExec, EvalExec evalExec, EsQueryExec queryExec, - LocalPhysicalOptimizerContext ctx + LocalPhysicalOptimizerContext ctx, + Expression timestampField ) { LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()); AttributeMap aliasReplacedBy = getAliasReplacedBy(evalExec); @@ -92,7 +105,7 @@ private static PhysicalPlan planFilterExec( } // Replace field references with their actual field attributes pushable.replaceAll(e -> e.transformDown(ReferenceAttribute.class, r -> aliasReplacedBy.resolve(r, r))); - return rewrite(pushdownPredicates, filterExec, queryExec, pushable, nonPushable, evalExec.fields()); + return rewrite(ctx, pushdownPredicates, filterExec, queryExec, pushable, nonPushable, evalExec.fields(), timestampField); } static AttributeMap getAliasReplacedBy(EvalExec evalExec) { @@ -106,44 +119,60 @@ static AttributeMap getAliasReplacedBy(EvalExec evalExec) { } private static PhysicalPlan rewrite( + LocalPhysicalOptimizerContext ctx, LucenePushdownPredicates pushdownPredicates, FilterExec filterExec, EsQueryExec queryExec, List pushable, List nonPushable, - List evalFields + List evalFields, + Expression timestampField ) { - // Combine GT, GTE, LT and LTE in pushable to Range if possible - List newPushable = combineEligiblePushableToRange(pushable); - if (newPushable.size() > 0) { // update the executable with pushable conditions - Query queryDSL = TRANSLATOR_HANDLER.asQuery(pushdownPredicates, Predicates.combineAnd(newPushable)); - QueryBuilder planQuery = queryDSL.toQueryBuilder(); - Queries.Clause combiningQueryClauseType = queryExec.hasScoring() ? Queries.Clause.MUST : Queries.Clause.FILTER; - var query = Queries.combine(combiningQueryClauseType, asList(queryExec.query(), planQuery)); - queryExec = new EsQueryExec( - queryExec.source(), - queryExec.indexPattern(), - queryExec.indexMode(), - queryExec.output(), - queryExec.limit(), - queryExec.sorts(), - queryExec.estimatedRowSize(), - List.of(new EsQueryExec.QueryBuilderAndTags(query, List.of())) + EsQueryExec newQueryExec = null; + if (queryExec.indexMode() == IndexMode.TIME_SERIES && timestampField != null) { + newQueryExec = TimeSeriesSourcePartitioner.partitionTimeSeriesSource( + ctx, + timestampField, + pushdownPredicates, + queryExec, + pushable ); + } + if (newQueryExec == null) { + // Combine GT, GTE, LT and LTE in pushable to Range if possible + List newPushable = combineEligiblePushableToRange(pushable); + if (newPushable.isEmpty() == false) { + Query queryDSL = TRANSLATOR_HANDLER.asQuery(pushdownPredicates, Predicates.combineAnd(newPushable)); + QueryBuilder planQuery = queryDSL.toQueryBuilder(); + Queries.Clause combiningQueryClauseType = queryExec.hasScoring() ? Queries.Clause.MUST : Queries.Clause.FILTER; + var query = Queries.combine(combiningQueryClauseType, asList(queryExec.query(), planQuery)); + newQueryExec = new EsQueryExec( + queryExec.source(), + queryExec.indexPattern(), + queryExec.indexMode(), + queryExec.output(), + queryExec.limit(), + queryExec.sorts(), + queryExec.estimatedRowSize(), + List.of(new EsQueryExec.QueryBuilderAndTags(query, List.of())) + ); + } + } + if (newQueryExec != null) { // If the eval contains other aliases, not just field attributes, we need to keep them in the plan - PhysicalPlan plan = evalFields.isEmpty() ? queryExec : new EvalExec(filterExec.source(), queryExec, evalFields); - if (nonPushable.size() > 0) { - // update filter with remaining non-pushable conditions - return new FilterExec(filterExec.source(), plan, Predicates.combineAnd(nonPushable)); - } else { + PhysicalPlan plan = evalFields.isEmpty() ? newQueryExec : new EvalExec(filterExec.source(), newQueryExec, evalFields); + if (nonPushable.isEmpty()) { // prune Filter entirely return plan; + } else { + // update filter with remaining non-pushable conditions + return new FilterExec(filterExec.source(), plan, Predicates.combineAnd(nonPushable)); } - } // else: nothing changes + } return filterExec; } - private static List combineEligiblePushableToRange(List pushable) { + static List combineEligiblePushableToRange(List pushable) { List bcs = new ArrayList<>(); List ranges = new ArrayList<>(); List others = new ArrayList<>(); @@ -156,6 +185,8 @@ private static List combineEligiblePushableToRange(List } else { others.add(e); } + } else if (e instanceof Range r) { + ranges.add(r); } else { others.add(e); } @@ -215,4 +246,22 @@ else if ((other instanceof GreaterThan || other instanceof GreaterThanOrEqual) } return changed ? CollectionUtils.combine(others, bcs, ranges) : pushable; } + + private static Expression timestampFieldForTimeSeries(PhysicalPlan plan, EsQueryExec queryExec) { + if (queryExec.indexMode() != IndexMode.TIME_SERIES) { + return null; + } + Holder timestampHolder = new Holder<>(); + plan.forEachDown(TimeSeriesAggregateExec.class, ts -> { + for (NamedExpression agg : ts.aggregates()) { + if (Alias.unwrap(agg) instanceof TimestampAware timestampAware) { + if (timestampAware.timestamp() != null) { + timestampHolder.set(timestampAware.timestamp()); + return; + } + } + } + }); + return timestampHolder.get(); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/TimeSeriesSourcePartitioner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/TimeSeriesSourcePartitioner.java new file mode 100644 index 0000000000000..55e7072959792 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/TimeSeriesSourcePartitioner.java @@ -0,0 +1,269 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; + +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; +import org.elasticsearch.xpack.esql.core.querydsl.query.Query; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.util.Queries; +import org.elasticsearch.xpack.esql.expression.predicate.Predicates; +import org.elasticsearch.xpack.esql.expression.predicate.Range; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThanOrEqual; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThanOrEqual; +import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.Arrays.asList; +import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; + +/** + * Partitions time-series data source queries into multiple queries based on time intervals. + * The ideal number of slices should be close to the number of available processors. + * Too many slices may lead to high overhead; too few slices may cause under-utilization of CPUs + * and require high memory to buffer data points for rate calculations. + * Prefer partitioning the query interval into small, fixed, and predefined intervals to improve cache hit rate. + */ +final class TimeSeriesSourcePartitioner { + + static EsQueryExec partitionTimeSeriesSource( + LocalPhysicalOptimizerContext ctx, + Expression timestampField, + LucenePushdownPredicates pushdownPredicates, + EsQueryExec queryExec, + List pushable + ) { + // TODO: support date_nanos + if (timestampField == null || timestampField.dataType() != DataType.DATETIME) { + return null; + } + if (pushdownPredicates.isPushableFieldAttribute(timestampField) == false) { + return null; + } + Object minTimestampFromData = ctx.searchStats().min(new FieldAttribute.FieldName(MetadataAttribute.TIMESTAMP_FIELD)); + Object maxTimestampFromData = ctx.searchStats().max(new FieldAttribute.FieldName(MetadataAttribute.TIMESTAMP_FIELD)); + if (minTimestampFromData instanceof Long minTs && maxTimestampFromData instanceof Long maxTs) { + maxTs++; // maxTs from data is inclusive, make it exclusive + long dataInterval = maxTs - minTs; + Tuple minMaxFromPredicates = minMaxTimestampFromQuery(timestampField, pushable); + minTs = Math.max(minTs, minMaxFromPredicates.v1()); + maxTs = Math.min(maxTs, minMaxFromPredicates.v2()); + var filters = partitionFiltersByTimeIntervals(ctx, timestampField, dataInterval, minTs, maxTs); + if (filters.isEmpty() || filters.size() == 1) { + return null; + } + return pushDownFiltersAndPartitionFilters( + pushdownPredicates, + queryExec, + removeTimestampFilters(timestampField, pushable), + filters + ); + } + return null; + } + + private static EsQueryExec pushDownFiltersAndPartitionFilters( + LucenePushdownPredicates pushdownPredicates, + EsQueryExec queryExec, + List pushable, + List partitionFilters + ) { + List queryAndTags = new ArrayList<>(); + QueryBuilder mainQuery = queryExec.query(); + for (Range partition : partitionFilters) { + var newPushable = PushFiltersToSource.combineEligiblePushableToRange(CollectionUtils.appendToCopy(pushable, partition)); + Query queryDSL = TRANSLATOR_HANDLER.asQuery(pushdownPredicates, Predicates.combineAnd(newPushable)); + QueryBuilder planQuery = queryDSL.toQueryBuilder(); + QueryBuilder partitionQuery = Queries.combine(Queries.Clause.FILTER, asList(mainQuery, planQuery)); + queryAndTags.add(new EsQueryExec.QueryBuilderAndTags(partitionQuery, List.of())); + } + return new EsQueryExec( + queryExec.source(), + queryExec.indexPattern(), + queryExec.indexMode(), + queryExec.output(), + queryExec.limit(), + queryExec.sorts(), + queryExec.estimatedRowSize(), + queryAndTags + ); + } + + private static List removeTimestampFilters(Expression timestampField, List pushable) { + List removed = new ArrayList<>(pushable.size()); + for (Expression expr : pushable) { + if (expr instanceof EsqlBinaryComparison bin + && bin.right().foldable() + && bin.right() instanceof Literal l + && l.value() instanceof Long + && bin.left().dataType() == timestampField.dataType() + && bin.left().semanticEquals(timestampField)) { + continue; + } + if (expr instanceof Range r && r.value().dataType() == timestampField.dataType() && r.value().semanticEquals(timestampField)) { + continue; + } + removed.add(expr); + } + return removed; + } + + private static List partitionFiltersByTimeIntervals( + LocalPhysicalOptimizerContext ctx, + Expression timestampField, + long dataInterval, + long queryStartTs, + long queryEndTs + ) { + long maxRateBufferBytes = ctx.plannerSettings().getRateBufferSize().getBytes(); + int taskConcurrency = ctx.configuration().pragmas().taskConcurrency(); + long queryInterval = queryEndTs - queryStartTs; + // The number of data points should be based on the number of docs with the counter field, not the total docs. + // Here we estimate that by assuming the counter field appears in 10% of documents on average. + // TODO: access the TSDB codec for the accurate estimate + long totalDocs = ctx.searchStats().count() / 10; + long queryDocs = totalDocs * queryInterval / dataInterval; + long selectedInterval = -1; + long selectedSlices = -1; + // TODO: retrieve this from planner settings instead + final int availableProcessors = Math.ceilDiv(taskConcurrency * 2, 3); + for (int i = TIME_SERIES_PARTITION_INTERVALS.length - 1; i >= 0; i--) { + long sliceInterval = TIME_SERIES_PARTITION_INTERVALS[i]; + long numSlices = Math.ceilDiv(queryInterval, sliceInterval); + long docPerSlice = queryDocs / numSlices; + long requiredBytes = docPerSlice * 2 * Long.BYTES * Math.min(availableProcessors, numSlices); + if (requiredBytes <= maxRateBufferBytes) { + if (selectedInterval == -1 + || numSlices <= availableProcessors + || (selectedSlices < availableProcessors && numSlices <= taskConcurrency)) { + selectedInterval = sliceInterval; + selectedSlices = numSlices; + } else { + break; + } + } + } + if (selectedInterval == -1) { + return List.of(); + } + List filters = new ArrayList<>(); + // round the intervals to improve caching + var pt = (queryStartTs + selectedInterval - 1) / selectedInterval * selectedInterval; + if (queryStartTs < pt) { + filters.add( + new Range( + timestampField.source(), + timestampField, + new Literal(timestampField.source(), queryStartTs, timestampField.dataType()), + true, + new Literal(timestampField.source(), pt, timestampField.dataType()), + false, + ctx.configuration().zoneId() + ) + ); + } + while (pt < queryEndTs) { + long nextPt = Math.min(pt + selectedInterval, queryEndTs); + filters.add( + new Range( + timestampField.source(), + timestampField, + new Literal(timestampField.source(), pt, timestampField.dataType()), + true, + new Literal(timestampField.source(), nextPt, timestampField.dataType()), + false, + ctx.configuration().zoneId() + ) + ); + pt = nextPt; + } + return filters; + } + + private static Tuple minMaxTimestampFromQuery(Expression timestampField, List expressions) { + long minTimestamp = Long.MIN_VALUE; + long maxTimestamp = Long.MAX_VALUE; + for (Expression expr : expressions) { + if (expr instanceof EsqlBinaryComparison bin + && bin.right().foldable() + && bin.right() instanceof Literal l + && l.value() instanceof Long v + && bin.left().dataType() == timestampField.dataType() + && bin.left().semanticEquals(timestampField)) { + switch (expr) { + case Equals unused -> { + minTimestamp = Math.max(minTimestamp, v); + maxTimestamp = Math.min(maxTimestamp, v + 1); + } + case GreaterThan unused -> minTimestamp = Math.max(minTimestamp, v + 1); + case GreaterThanOrEqual unused -> minTimestamp = Math.max(minTimestamp, v); + case LessThan unused -> maxTimestamp = Math.min(maxTimestamp, v); + case LessThanOrEqual unused -> maxTimestamp = Math.min(maxTimestamp, v + 1); + default -> { + } + } + } + if (expr instanceof Range r && r.value().dataType() == timestampField.dataType() && r.value().semanticEquals(timestampField)) { + if (r.lower() instanceof Literal lowerLit && lowerLit.value() instanceof Long lowerVal) { + if (r.includeLower()) { + minTimestamp = Math.max(minTimestamp, lowerVal); + } else { + minTimestamp = Math.max(minTimestamp, lowerVal + 1); + } + } + if (r.upper() instanceof Literal upperLit && upperLit.value() instanceof Long upperVal) { + if (r.includeUpper()) { + maxTimestamp = Math.min(maxTimestamp, upperVal + 1); + } else { + maxTimestamp = Math.min(maxTimestamp, upperVal); + } + } + } + } + return Tuple.tuple(minTimestamp, maxTimestamp); + } + + static final long[] TIME_SERIES_PARTITION_INTERVALS = new long[] { + TimeValue.timeValueSeconds(1).millis(), + TimeValue.timeValueSeconds(2).millis(), + TimeValue.timeValueSeconds(5).millis(), + TimeValue.timeValueSeconds(10).millis(), + TimeValue.timeValueSeconds(20).millis(), + TimeValue.timeValueSeconds(30).millis(), + + TimeValue.timeValueMinutes(1).millis(), + TimeValue.timeValueMinutes(2).millis(), + TimeValue.timeValueMinutes(3).millis(), + TimeValue.timeValueMinutes(5).millis(), + TimeValue.timeValueMinutes(10).millis(), + TimeValue.timeValueMinutes(15).millis(), + TimeValue.timeValueMinutes(20).millis(), + TimeValue.timeValueMinutes(30).millis(), + + TimeValue.timeValueHours(1).millis(), + TimeValue.timeValueHours(2).millis(), + TimeValue.timeValueHours(3).millis(), + TimeValue.timeValueHours(5).millis(), + TimeValue.timeValueHours(12).millis(), + + TimeValue.timeValueDays(1).millis(), + TimeValue.timeValueDays(7).millis() }; +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java index 426dd032df91c..51b358590b418 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java @@ -290,8 +290,11 @@ public List queryBuilderAndTags() { } public boolean canSubstituteRoundToWithQueryBuilderAndTags() { + boolean queryWithoutTag = queryBuilderAndTags == null + || queryBuilderAndTags.isEmpty() + || (queryBuilderAndTags.size() == 1 && queryBuilderAndTags.getFirst().tags.isEmpty()); // LuceneTopNSourceOperator doesn't support QueryAndTags - return sorts == null || sorts.isEmpty(); + return queryWithoutTag && (sorts == null || sorts.isEmpty()); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java index 7225399aa14b3..18747a311b7e2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java @@ -37,6 +37,15 @@ public class PlannerSettings { Setting.Property.Dynamic ); + public static final Setting RATE_BUFFER_SIZE = new Setting<>("esql.rate_buffer_size", settings -> { + long oneTenth = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() / 10; + return ByteSizeValue.ofBytes(Math.max(oneTenth, ByteSizeValue.ofMb(1).getBytes())).getStringRep(); + }, + s -> MemorySizeValue.parseBytesSizeValueOrHeapRatio(s, "esql.rate_buffer_size"), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + public static final Setting LUCENE_TOPN_LIMIT = Setting.intSetting( "esql.lucene_topn_limit", IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY), @@ -61,6 +70,7 @@ public class PlannerSettings { private volatile DataPartitioning defaultDataPartitioning; private volatile ByteSizeValue valuesLoadingJumboSize; + private volatile ByteSizeValue rateBufferSize; private volatile int luceneTopNLimit; private volatile ByteSizeValue intermediateLocalRelationMaxSize; @@ -71,6 +81,7 @@ public PlannerSettings(ClusterService clusterService) { var clusterSettings = clusterService.getClusterSettings(); clusterSettings.initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); clusterSettings.initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v); + clusterSettings.initializeAndWatch(RATE_BUFFER_SIZE, v -> this.rateBufferSize = v); clusterSettings.initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v); clusterSettings.initializeAndWatch(INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, v -> this.intermediateLocalRelationMaxSize = v); } @@ -81,11 +92,13 @@ public PlannerSettings(ClusterService clusterService) { public PlannerSettings( DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize, + ByteSizeValue rateBufferSize, int luceneTopNLimit, ByteSizeValue intermediateLocalRelationMaxSize ) { this.defaultDataPartitioning = defaultDataPartitioning; this.valuesLoadingJumboSize = valuesLoadingJumboSize; + this.rateBufferSize = rateBufferSize; this.luceneTopNLimit = luceneTopNLimit; this.intermediateLocalRelationMaxSize = intermediateLocalRelationMaxSize; } @@ -119,4 +132,11 @@ public int luceneTopNLimit() { public ByteSizeValue intermediateLocalRelationMaxSize() { return intermediateLocalRelationMaxSize; } + + /** + * Returns the memory size allocated for buffering data points during rate calculations. + */ + public ByteSizeValue getRateBufferSize() { + return rateBufferSize; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 85635a33c8fa1..388dd0d48a137 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -255,6 +255,7 @@ public List> getSettings() { ESQL_QUERYLOG_INCLUDE_USER_SETTING, PlannerSettings.DEFAULT_DATA_PARTITIONING, PlannerSettings.VALUES_LOADING_JUMBO_SIZE, + PlannerSettings.RATE_BUFFER_SIZE, PlannerSettings.LUCENE_TOPN_LIMIT, PlannerSettings.INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, PlannerSettings.REDUCTION_LATE_MATERIALIZATION, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java index 5a3a421660683..ae423443f6150 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; @@ -76,6 +77,10 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) { } private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, EsqlFlags esqlFlags) { + return optimizedPlan(plan, TEST_PLANNER_SETTINGS, searchStats, esqlFlags); + } + + public PhysicalPlan optimizedPlan(PhysicalPlan plan, PlannerSettings plannerSettings, SearchStats searchStats, EsqlFlags esqlFlags) { // System.out.println("* Physical Before\n" + plan); var physicalPlan = EstimatesRowSize.estimateRowSize(0, physicalPlanOptimizer.optimize(plan)); // System.out.println("* Physical After\n" + physicalPlan); @@ -87,7 +92,7 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E new LocalLogicalOptimizerContext(config, FoldContext.small(), searchStats) ); var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(TEST_PLANNER_SETTINGS, esqlFlags, config, FoldContext.small(), searchStats), + new LocalPhysicalOptimizerContext(plannerSettings, esqlFlags, config, FoldContext.small(), searchStats), true ); var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer, null); @@ -99,7 +104,7 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, E return l; } - private PhysicalPlan physicalPlan(String query, Analyzer analyzer) { + public PhysicalPlan physicalPlan(String query, Analyzer analyzer) { LogicalPlan logical = logicalOptimizer.optimize(analyzer.analyze(EsqlParser.INSTANCE.parseQuery(query))); // System.out.println("Logical\n" + logical); return mapper.map(new Versioned<>(logical, analyzer.context().minimumVersion())); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PartitionTimeSeriesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PartitionTimeSeriesTests.java new file mode 100644 index 0000000000000..20ac3c3a5a65d --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PartitionTimeSeriesTests.java @@ -0,0 +1,276 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateFormatter; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.lucene.DataPartitioning; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.ExistsQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.optimizer.AbstractLocalPhysicalPlanOptimizerTests; +import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; +import org.elasticsearch.xpack.esql.optimizer.TestPlannerOptimizer; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; +import org.elasticsearch.xpack.esql.planner.PlannerUtils; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; +import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.stats.SearchStats; + +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; + +public class PartitionTimeSeriesTests extends AbstractLocalPhysicalPlanOptimizerTests { + final DateFormatter fmt = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; + + public PartitionTimeSeriesTests(String name, Configuration config) { + super(name, config); + } + + private void runTest( + long totalDocs, + String minTimestampFromData, + String maxTimestampFromData, + String filter, + ByteSizeValue rateBufferSize, + int taskConcurrency, + List expectedRanges + ) { + Map minValue = Map.of("@timestamp", fmt.parseMillis(minTimestampFromData)); + Map maxValue = Map.of("@timestamp", fmt.parseMillis(maxTimestampFromData)); + SearchStats searchStats = new EsqlTestUtils.TestSearchStatsWithMinMax(minValue, maxValue) { + @Override + public Map targetShards() { + var indexMetadata = IndexMetadata.builder("k8s") + .settings( + ESTestCase.indexSettings(IndexVersion.current(), 1, 1) + .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.name()) + ) + .build(); + return Map.of(new ShardId(new Index("id", "n/a"), 1), indexMetadata); + } + + @Override + public long count() { + return totalDocs; + } + }; + var plannerSettings = new PlannerSettings( + DataPartitioning.SHARD, + ByteSizeValue.ofMb(1), + rateBufferSize, + 10_000, + ByteSizeValue.ofMb(1) + ); + Configuration config = configuration( + new QueryPragmas( + Settings.builder().put(QueryPragmas.TASK_CONCURRENCY.getKey().toLowerCase(Locale.ROOT), taskConcurrency).build() + ) + ); + var planner = new TestPlannerOptimizer( + config, + timeSeriesAnalyzer, + new LogicalPlanOptimizer(new LogicalOptimizerContext(config, FoldContext.small(), TransportVersion.current())) + ); + String q = String.format(Locale.ROOT, """ + TS k8s + %s + | STATS max(rate(network.total_bytes_in)) BY cluster, BUCKET(@timestamp, 1 hour) + | LIMIT 10 + """, (filter != null ? "| WHERE " + filter : "")); + PhysicalPlan plan = planner.physicalPlan(q, timeSeriesAnalyzer); + plan = PlannerUtils.integrateEsFilterIntoFragment(plan, null); + plan = planner.optimizedPlan(plan, plannerSettings, searchStats, new EsqlFlags(true)); + EsQueryExec esQuery = (EsQueryExec) plan.collectFirstChildren(EsQueryExec.class::isInstance).getFirst(); + List actualRanges = new ArrayList<>(); + for (EsQueryExec.QueryBuilderAndTags qt : esQuery.queryBuilderAndTags()) { + BoolQueryBuilder boolQuery = (BoolQueryBuilder) qt.query(); + List filters = boolQuery.must(); + assertThat(filters, hasSize(2)); + assertThat(filters.get(0), instanceOf(ExistsQueryBuilder.class)); + SingleValueQuery.Builder singleValueQuery = as(filters.get(1), SingleValueQuery.Builder.class); + RangeQueryBuilder r = as(singleValueQuery.next(), RangeQueryBuilder.class); + assertThat(r.fieldName(), equalTo("@timestamp")); + assertTrue(r.includeLower()); + assertFalse(r.includeUpper()); + actualRanges.add(new QueryRange(r.from().toString(), r.to().toString())); + } + assertThat(actualRanges, equalTo(expectedRanges)); + } + + record QueryRange(String fromInclusive, String toInclusive) { + + } + + public void testPartition() { + runTest( + 20_000_000L, + "2023-10-20T12:15:03.360Z", + "2023-10-20T12:59:02.250Z", + null, + ByteSizeValue.ofMb(75), + 12, // 8 cpus + List.of( + new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:20:00.000Z"), + new QueryRange("2023-10-20T12:20:00.000Z", "2023-10-20T12:25:00.000Z"), + new QueryRange("2023-10-20T12:25:00.000Z", "2023-10-20T12:30:00.000Z"), + new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:35:00.000Z"), + new QueryRange("2023-10-20T12:35:00.000Z", "2023-10-20T12:40:00.000Z"), + new QueryRange("2023-10-20T12:40:00.000Z", "2023-10-20T12:45:00.000Z"), + new QueryRange("2023-10-20T12:45:00.000Z", "2023-10-20T12:50:00.000Z"), + new QueryRange("2023-10-20T12:50:00.000Z", "2023-10-20T12:55:00.000Z"), + new QueryRange("2023-10-20T12:55:00.000Z", "2023-10-20T12:59:02.251Z") + ) + ); + runTest( + 20_000_000L, + "2023-10-20T12:15:03.360Z", + "2023-10-20T12:59:02.250Z", + null, + ByteSizeValue.ofMb(75), + 6, // 4 cpus + List.of( + new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:20:00.000Z"), + new QueryRange("2023-10-20T12:20:00.000Z", "2023-10-20T12:30:00.000Z"), + new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:40:00.000Z"), + new QueryRange("2023-10-20T12:40:00.000Z", "2023-10-20T12:50:00.000Z"), + new QueryRange("2023-10-20T12:50:00.000Z", "2023-10-20T12:59:02.251Z") + ) + ); + + runTest( + 20_000_000L, + "2023-10-20T12:15:03.360Z", + "2023-10-20T12:59:02.250Z", + null, + ByteSizeValue.ofMb(75), + 3, // 2 cpus + List.of( + new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:30:00.000Z"), + new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:59:02.251Z") + ) + ); + + runTest( + 20_000_000L, + "2023-10-20T12:15:03.360Z", + "2023-10-20T12:59:02.250Z", + null, + ByteSizeValue.ofMb(30), // smaller buffer to more smaller slices + 3, // 2 cpus + List.of( + new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:20:00.000Z"), + new QueryRange("2023-10-20T12:20:00.000Z", "2023-10-20T12:40:00.000Z"), + new QueryRange("2023-10-20T12:40:00.000Z", "2023-10-20T12:59:02.251Z") + ) + ); + runTest( + 20_000_000L, + "2023-10-20T12:15:03.360Z", + "2023-10-20T12:59:02.250Z", + null, + ByteSizeValue.ofMb(10), // smaller buffer to more smaller slices + 3, // 2 cpus + List.of( + new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:20:00.000Z"), + new QueryRange("2023-10-20T12:20:00.000Z", "2023-10-20T12:25:00.000Z"), + new QueryRange("2023-10-20T12:25:00.000Z", "2023-10-20T12:30:00.000Z"), + new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:35:00.000Z"), + new QueryRange("2023-10-20T12:35:00.000Z", "2023-10-20T12:40:00.000Z"), + new QueryRange("2023-10-20T12:40:00.000Z", "2023-10-20T12:45:00.000Z"), + new QueryRange("2023-10-20T12:45:00.000Z", "2023-10-20T12:50:00.000Z"), + new QueryRange("2023-10-20T12:50:00.000Z", "2023-10-20T12:55:00.000Z"), + new QueryRange("2023-10-20T12:55:00.000Z", "2023-10-20T12:59:02.251Z") + ) + ); + runTest( + 5_000_000L, // less docs + "2023-10-20T12:15:03.360Z", + "2023-10-20T12:59:02.250Z", + null, + ByteSizeValue.ofMb(10), // smaller buffer to more smaller slices + 3, // 2 cpus + List.of( + new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:30:00.000Z"), + new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:59:02.251Z") + ) + ); + } + + public void testPartitionWithQuery() { + runTest( + 20_000_000L, + "2023-11-20T12:15:03.360Z", + "2023-11-20T12:59:02.250Z", + null, + ByteSizeValue.ofMb(75), + 12, // 8 cpus + List.of( + new QueryRange("2023-11-20T12:15:03.360Z", "2023-11-20T12:20:00.000Z"), + new QueryRange("2023-11-20T12:20:00.000Z", "2023-11-20T12:25:00.000Z"), + new QueryRange("2023-11-20T12:25:00.000Z", "2023-11-20T12:30:00.000Z"), + new QueryRange("2023-11-20T12:30:00.000Z", "2023-11-20T12:35:00.000Z"), + new QueryRange("2023-11-20T12:35:00.000Z", "2023-11-20T12:40:00.000Z"), + new QueryRange("2023-11-20T12:40:00.000Z", "2023-11-20T12:45:00.000Z"), + new QueryRange("2023-11-20T12:45:00.000Z", "2023-11-20T12:50:00.000Z"), + new QueryRange("2023-11-20T12:50:00.000Z", "2023-11-20T12:55:00.000Z"), + new QueryRange("2023-11-20T12:55:00.000Z", "2023-11-20T12:59:02.251Z") + ) + ); + + String filter = randomFrom( + "\"2023-11-20T12:16:03.360Z\" <= @timestamp AND @timestamp <= \"2023-11-20T12:57:02.250Z\"", + "TRANGE(\"2023-11-20T12:16:03.359Z\", \"2023-11-20T12:57:02.250Z\")" + ); + runTest( + 20_000_000L, + "2023-11-20T12:15:03.360Z", + "2023-11-20T12:59:02.250Z", + filter, + ByteSizeValue.ofMb(75), + 12, // 8 cpus + List.of( + new QueryRange("2023-11-20T12:16:03.360Z", "2023-11-20T12:20:00.000Z"), + new QueryRange("2023-11-20T12:20:00.000Z", "2023-11-20T12:25:00.000Z"), + new QueryRange("2023-11-20T12:25:00.000Z", "2023-11-20T12:30:00.000Z"), + new QueryRange("2023-11-20T12:30:00.000Z", "2023-11-20T12:35:00.000Z"), + new QueryRange("2023-11-20T12:35:00.000Z", "2023-11-20T12:40:00.000Z"), + new QueryRange("2023-11-20T12:40:00.000Z", "2023-11-20T12:45:00.000Z"), + new QueryRange("2023-11-20T12:45:00.000Z", "2023-11-20T12:50:00.000Z"), + new QueryRange("2023-11-20T12:50:00.000Z", "2023-11-20T12:55:00.000Z"), + new QueryRange("2023-11-20T12:55:00.000Z", "2023-11-20T12:57:02.251Z") + ) + ); + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java index caaeeabb3eecd..eb3c09c22ff59 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java @@ -1006,17 +1006,6 @@ public Map targetShards() { return Map.of(new ShardId(new Index("id", "n/a"), 1), indexMetadata); } }; - // enable filter-by-filter for rate aggregations - { - String q = """ - TS k8s - | STATS max(rate(network.total_bytes_in)) BY cluster, BUCKET(@timestamp, 1 hour) - | LIMIT 10 - """; - PhysicalPlan plan = plannerOptimizerTimeSeries.plan(q, searchStats, timeSeriesAnalyzer); - int queryAndTags = plainQueryAndTags(plan); - assertThat(queryAndTags, equalTo(4)); - } // disable filter-by-filter for non-rate aggregations { String q = """ diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index d86d6747e2981..ac3fed8126cf7 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -278,7 +278,13 @@ public void testTimeSeries() throws IOException { 10, null ); - PlannerSettings plannerSettings = new PlannerSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000, ByteSizeValue.ofMb(1)); + PlannerSettings plannerSettings = new PlannerSettings( + DataPartitioning.AUTO, + ByteSizeValue.ofMb(1), + ByteSizeValue.ofMb(2), + 10_000, + ByteSizeValue.ofMb(1) + ); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( "test", FoldContext.small(),