diff --git a/docs/changelog/140475.yaml b/docs/changelog/140475.yaml deleted file mode 100644 index 183c8ab3379a0..0000000000000 --- a/docs/changelog/140475.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 140475 -summary: Partition time-series source -area: ES|QL -type: enhancement -issues: [] diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java index 4099ad74c0b0d..0181882515f9c 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterTimeSeriesIT.java @@ -52,7 +52,7 @@ @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class MultiClusterTimeSeriesIT extends ESRestTestCase { - static final List REQUIRED_CAPABILITIES = List.of("ts_command_v0", "rate_fix_resets_multiple_segments"); + static final List REQUIRED_CAPABILITIES = List.of("ts_command_v0"); static ElasticsearchCluster remoteCluster = Clusters.remoteCluster(); static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index f0af67c6a52a6..9c59e1167c7ca 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -588,7 +588,6 @@ public static LogicalOptimizerContext unboundLogicalOptimizerContext() { public static final PlannerSettings TEST_PLANNER_SETTINGS = new PlannerSettings( DataPartitioning.AUTO, ByteSizeValue.ofMb(1), - ByteSizeValue.ofMb(2), 10_000, ByteSizeValue.ofMb(1) ); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec index dce200ba24810..de72bebba001e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries.csv-spec @@ -24,7 +24,6 @@ max_bytes:long | cluster: keyword maxRate required_capability: ts_command_v0 required_capability: double_quotes_source_enclosing -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max=max(rate(network.total_bytes_in)) | EVAL max=ROUND(max, 6) | KEEP max; max: double @@ -34,7 +33,6 @@ max: double maxRatePerMinute required_capability: ts_command_v0 required_capability: double_quotes_source_enclosing -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max=max(60 * rate(network.total_bytes_in)) | EVAL max=ROUND(max, 6) | KEEP max; max: double @@ -43,7 +41,6 @@ max: double maxCost required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_cost=max(rate(network.total_cost)) | EVAL max_cost=ROUND(max_cost, 6) | KEEP max_cost; max_cost: double @@ -52,7 +49,6 @@ max_cost: double maxRateAndBytes required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(60 * rate(network.total_bytes_in)), max_bytes=max(network.bytes_in) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_bytes; max_rate: double | max_bytes: long @@ -61,7 +57,6 @@ max_rate: double | max_bytes: long maxRateAndBytesExplicit required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(60 * rate(network.total_bytes_in)), max_bytes=max(last_over_time(network.bytes_in)) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_bytes; max_rate: double | max_bytes: long @@ -70,7 +65,6 @@ max_rate: double | max_bytes: long maxRateAndMarkupBytes required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_markup=max(network.bytes_in * 1.05) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_markup; max_rate: double | max_markup: double @@ -79,7 +73,6 @@ max_rate: double | max_markup: double maxRateAndMarkupBytesExplicit required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_bytes_in = max(last_over_time(network.bytes_in) * 1.05) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_bytes_in; max_rate: double | max_bytes_in: double @@ -88,7 +81,6 @@ max_rate: double | max_bytes_in: double maxRateAndLastBytesIn required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_bytes_in = max(last_over_time(network.bytes_in * 1.05)) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_bytes_in; max_rate: double | max_bytes_in: double @@ -97,7 +89,6 @@ max_rate: double | max_bytes_in: double maxRateAndBytesAndCost required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_bytes=max(max_over_time(network.bytes_in)), max_cost=max(rate(network.total_cost)) | EVAL max_rate=ROUND(max_rate, 6), max_cost=ROUND(max_cost, 6) | KEEP max_rate, max_bytes, max_cost; max_rate:double | max_bytes:long | max_cost:double @@ -106,7 +97,6 @@ max_rate:double | max_bytes:long | max_cost:double sumRate required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS bytes=sum(rate(network.total_bytes_in)), sum_cost=sum(rate(network.total_cost)) BY cluster | EVAL bytes=ROUND(bytes, 6), sum_cost=ROUND(sum_cost, 6) | KEEP bytes, sum_cost, cluster | SORT cluster; bytes: double | sum_cost: double | cluster: keyword @@ -118,7 +108,6 @@ bytes: double | sum_cost: double | cluster: keyword oneRateWithBucket required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 -required_capability: rate_fix_resets_multiple_segments // tag::rate[] TS k8s @@ -136,8 +125,8 @@ max_rate: double | time_bucket:date oneRateWithTBucket required_capability: ts_command_v0 +required_capability: tbucket required_capability: rate_with_interpolation_v2 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)) BY time_bucket = tbucket(5minute) @@ -152,7 +141,6 @@ max_rate: double | time_bucket:date oneRateWithPromql required_capability: promql_pre_tech_preview_v7 required_capability: rate_with_interpolation_v2 -required_capability: rate_fix_resets_multiple_segments PROMQL index=k8s step=5m max(rate(network.total_bytes_in[5m])) | SORT step DESC | LIMIT 2; @@ -165,8 +153,7 @@ max(rate(network.total_bytes_in[5m])):double | step:datetime oneRateWithSingleTBucket required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments - +required_capability: tbucket TS k8s | STATS max_rate=max(rate(network.total_bytes_in)) BY time_bucket = tbucket(1h) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, time_bucket; @@ -177,7 +164,6 @@ max_rate: double | time_bucket:date oneRateWithSingleStepPromql required_capability: promql_pre_tech_preview_v7 -required_capability: rate_fix_resets_multiple_segments PROMQL index=k8s step=1h max(rate(network.total_bytes_in[1h])); max(rate(network.total_bytes_in[1h])):double | step:datetime @@ -188,7 +174,6 @@ max(rate(network.total_bytes_in[1h])):double | step:datetime twoRatesWithBucket required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), sum_rate=sum(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute) | EVAL max_rate=ROUND(max_rate, 6), sum_rate=ROUND(sum_rate, 6) | KEEP max_rate, sum_rate, time_bucket | SORT time_bucket DESC | LIMIT 3; @@ -200,9 +185,8 @@ max_rate:double | sum_rate:double | time_bucket:datetime twoRatesWithTBucket required_capability: ts_command_v0 +required_capability: tbucket required_capability: rate_with_interpolation_v2 -required_capability: rate_fix_resets_multiple_segments - TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), sum_rate=sum(rate(network.total_bytes_in)) BY time_bucket = tbucket(5minute) | EVAL max_rate=ROUND(max_rate, 6), sum_rate=ROUND(sum_rate, 6) | KEEP max_rate, sum_rate, time_bucket | SORT time_bucket DESC | LIMIT 3; max_rate:double | sum_rate:double | time_bucket:datetime @@ -214,8 +198,6 @@ max_rate:double | sum_rate:double | time_bucket:datetime oneRateWithBucketAndCluster required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 -required_capability: rate_fix_resets_multiple_segments - TS k8s | STATS max_rate=max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, time_bucket, cluster | SORT time_bucket DESC, cluster | LIMIT 6; max_rate:double | time_bucket:datetime | cluster:keyword @@ -230,7 +212,6 @@ max_rate:double | time_bucket:datetime | cluster:keyword BytesAndCostByBucketAndCluster required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_rate=max(rate(network.total_bytes_in)), max_cost=max(max_over_time(network.cost)) BY time_bucket = bucket(@timestamp,5minute), cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, max_cost, time_bucket, cluster | SORT time_bucket DESC, cluster | LIMIT 6; @@ -247,7 +228,6 @@ max_rate:double | max_cost:double | time_bucket:date | cluster: keyword oneRateWithBucketAndClusterThenFilter required_capability: ts_command_v0 required_capability: rate_with_interpolation_v2 -required_capability: rate_fix_resets_multiple_segments TS k8s | WHERE cluster=="prod" | STATS max_rate=max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, time_bucket, cluster | SORT time_bucket DESC | LIMIT 3; @@ -260,8 +240,6 @@ max_rate:double | time_bucket:datetime | cluster:keyword maxRateWithInlineFilter required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments - TS k8s | STATS max_rate = max(rate(network.total_bytes_in)) WHERE cluster=="prod" BY cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, cluster | SORT cluster; max_rate:double | cluster:keyword @@ -272,8 +250,6 @@ null | staging maxRateWithPreFilter required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments - TS k8s | WHERE cluster=="prod" | STATS max_rate = max(rate(network.total_bytes_in)) BY cluster | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, cluster | SORT cluster; max_rate:double | cluster:keyword @@ -283,7 +259,6 @@ max_rate:double | cluster:keyword notEnoughSamples required_capability: ts_command_v0 required_capability: rate_with_interpolation -required_capability: rate_fix_resets_multiple_segments TS k8s | WHERE @timestamp <= "2024-05-10T00:06:14.000Z" | STATS max_rate=max(rate(network.total_bytes_in)) BY pod, time_bucket = bucket(@timestamp,1minute) | EVAL max_rate=ROUND(max_rate, 6) | KEEP max_rate, pod, time_bucket | SORT pod, time_bucket DESC | LIMIT 10; @@ -541,7 +516,6 @@ distincts:long | distincts_imprecise:long | cluster:keyword | time_bucket:dateti two_rates required_capability: ts_command_v0 required_capability: rate_with_interpolation -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS cost_per_mb=max(rate(network.total_bytes_in) / 1024 * 1024 * rate(network.total_cost)) BY cluster, time_bucket = bucket(@timestamp,5minute) | EVAL cost_per_mb=ROUND(cost_per_mb, 6) | KEEP cost_per_mb, cluster, time_bucket | SORT cost_per_mb DESC, cluster, time_bucket DESC | LIMIT 5; @@ -742,7 +716,6 @@ sd:double | var:double | sd_squared:double | cluster:keyword | time_bucket:datet Max of Rate with Bucket required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS maxRate = max(rate(network.total_cost)) BY tbucket = bucket(@timestamp, 1hour) @@ -754,7 +727,6 @@ maxRate:double | tbucket:datetime Max of Rate with Bucket, rename _tsid required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s METADATA _tsid | RENAME _tsid AS newTsid @@ -769,7 +741,6 @@ maxRate:double | tbucket:datetime Max of Rate with Bucket, drop _tsid required_capability: ts_command_v0 -required_capability: rate_fix_resets_multiple_segments TS k8s METADATA _tsid | DROP _tsid @@ -972,7 +943,6 @@ max:double | inc:double | time_bucket:date_nanos | cluster:keyword dateNanosOneRateIncreaseWithBucketAndClusterThenFilter required_capability: ts_command_v0 required_capability: ts_rate_datenanos_2 -required_capability: rate_fix_resets_multiple_segments TS datenanos-k8s | WHERE cluster=="prod" @@ -1021,7 +991,6 @@ required_capability: ts_command_v0 required_capability: ts_linreg_derivative required_capability: ts_deriv_datenanos required_capability: rate_with_interpolation_v2 -required_capability: rate_fix_resets_multiple_segments TS k8s | STATS max_deriv = max(deriv(to_long(network.total_bytes_in))), max_rate = max(rate(network.total_bytes_in)) BY time_bucket = bucket(@timestamp,5minute), cluster diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java index 38854292dd7a4..6da74fdd564bb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java @@ -7,20 +7,16 @@ package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; -import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeMap; import org.elasticsearch.xpack.esql.core.expression.Expression; -import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.expression.predicate.operator.comparison.BinaryComparison; import org.elasticsearch.xpack.esql.core.querydsl.query.Query; import org.elasticsearch.xpack.esql.core.util.CollectionUtils; -import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.core.util.Queries; -import org.elasticsearch.xpack.esql.expression.function.TimestampAware; import org.elasticsearch.xpack.esql.expression.predicate.Predicates; import org.elasticsearch.xpack.esql.expression.predicate.Range; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; @@ -29,12 +25,11 @@ import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThanOrEqual; import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; -import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; -import org.elasticsearch.xpack.esql.rule.ParameterizedRule; import java.util.ArrayList; import java.util.List; @@ -44,27 +39,20 @@ import static org.elasticsearch.xpack.esql.expression.predicate.Predicates.splitAnd; import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; -public class PushFiltersToSource extends ParameterizedRule { +public class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule { @Override - public PhysicalPlan apply(PhysicalPlan rootPlan, LocalPhysicalOptimizerContext ctx) { - return rootPlan.transformDown(FilterExec.class, filterExec -> { - if (filterExec.child() instanceof EsQueryExec queryExec) { - return planFilterExec(filterExec, queryExec, ctx, timestampFieldForTimeSeries(rootPlan, queryExec)); - } else if (filterExec.child() instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec queryExec) { - return planFilterExec(filterExec, evalExec, queryExec, ctx, timestampFieldForTimeSeries(rootPlan, queryExec)); - } else { - return filterExec; - } - }); + protected PhysicalPlan rule(FilterExec filterExec, LocalPhysicalOptimizerContext ctx) { + PhysicalPlan plan = filterExec; + if (filterExec.child() instanceof EsQueryExec queryExec) { + plan = planFilterExec(filterExec, queryExec, ctx); + } else if (filterExec.child() instanceof EvalExec evalExec && evalExec.child() instanceof EsQueryExec queryExec) { + plan = planFilterExec(filterExec, evalExec, queryExec, ctx); + } + return plan; } - private static PhysicalPlan planFilterExec( - FilterExec filterExec, - EsQueryExec queryExec, - LocalPhysicalOptimizerContext ctx, - Expression timestampField - ) { + private static PhysicalPlan planFilterExec(FilterExec filterExec, EsQueryExec queryExec, LocalPhysicalOptimizerContext ctx) { LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()); List pushable = new ArrayList<>(); List nonPushable = new ArrayList<>(); @@ -78,15 +66,14 @@ private static PhysicalPlan planFilterExec( } } } - return rewrite(ctx, pushdownPredicates, filterExec, queryExec, pushable, nonPushable, List.of(), timestampField); + return rewrite(pushdownPredicates, filterExec, queryExec, pushable, nonPushable, List.of()); } private static PhysicalPlan planFilterExec( FilterExec filterExec, EvalExec evalExec, EsQueryExec queryExec, - LocalPhysicalOptimizerContext ctx, - Expression timestampField + LocalPhysicalOptimizerContext ctx ) { LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()); AttributeMap aliasReplacedBy = getAliasReplacedBy(evalExec); @@ -105,7 +92,7 @@ private static PhysicalPlan planFilterExec( } // Replace field references with their actual field attributes pushable.replaceAll(e -> e.transformDown(ReferenceAttribute.class, r -> aliasReplacedBy.resolve(r, r))); - return rewrite(ctx, pushdownPredicates, filterExec, queryExec, pushable, nonPushable, evalExec.fields(), timestampField); + return rewrite(pushdownPredicates, filterExec, queryExec, pushable, nonPushable, evalExec.fields()); } static AttributeMap getAliasReplacedBy(EvalExec evalExec) { @@ -119,60 +106,44 @@ static AttributeMap getAliasReplacedBy(EvalExec evalExec) { } private static PhysicalPlan rewrite( - LocalPhysicalOptimizerContext ctx, LucenePushdownPredicates pushdownPredicates, FilterExec filterExec, EsQueryExec queryExec, List pushable, List nonPushable, - List evalFields, - Expression timestampField + List evalFields ) { - EsQueryExec newQueryExec = null; - if (queryExec.indexMode() == IndexMode.TIME_SERIES && timestampField != null) { - newQueryExec = TimeSeriesSourcePartitioner.partitionTimeSeriesSource( - ctx, - timestampField, - pushdownPredicates, - queryExec, - pushable + // Combine GT, GTE, LT and LTE in pushable to Range if possible + List newPushable = combineEligiblePushableToRange(pushable); + if (newPushable.size() > 0) { // update the executable with pushable conditions + Query queryDSL = TRANSLATOR_HANDLER.asQuery(pushdownPredicates, Predicates.combineAnd(newPushable)); + QueryBuilder planQuery = queryDSL.toQueryBuilder(); + Queries.Clause combiningQueryClauseType = queryExec.hasScoring() ? Queries.Clause.MUST : Queries.Clause.FILTER; + var query = Queries.combine(combiningQueryClauseType, asList(queryExec.query(), planQuery)); + queryExec = new EsQueryExec( + queryExec.source(), + queryExec.indexPattern(), + queryExec.indexMode(), + queryExec.output(), + queryExec.limit(), + queryExec.sorts(), + queryExec.estimatedRowSize(), + List.of(new EsQueryExec.QueryBuilderAndTags(query, List.of())) ); - } - if (newQueryExec == null) { - // Combine GT, GTE, LT and LTE in pushable to Range if possible - List newPushable = combineEligiblePushableToRange(pushable); - if (newPushable.isEmpty() == false) { - Query queryDSL = TRANSLATOR_HANDLER.asQuery(pushdownPredicates, Predicates.combineAnd(newPushable)); - QueryBuilder planQuery = queryDSL.toQueryBuilder(); - Queries.Clause combiningQueryClauseType = queryExec.hasScoring() ? Queries.Clause.MUST : Queries.Clause.FILTER; - var query = Queries.combine(combiningQueryClauseType, asList(queryExec.query(), planQuery)); - newQueryExec = new EsQueryExec( - queryExec.source(), - queryExec.indexPattern(), - queryExec.indexMode(), - queryExec.output(), - queryExec.limit(), - queryExec.sorts(), - queryExec.estimatedRowSize(), - List.of(new EsQueryExec.QueryBuilderAndTags(query, List.of())) - ); - } - } - if (newQueryExec != null) { // If the eval contains other aliases, not just field attributes, we need to keep them in the plan - PhysicalPlan plan = evalFields.isEmpty() ? newQueryExec : new EvalExec(filterExec.source(), newQueryExec, evalFields); - if (nonPushable.isEmpty()) { - // prune Filter entirely - return plan; - } else { + PhysicalPlan plan = evalFields.isEmpty() ? queryExec : new EvalExec(filterExec.source(), queryExec, evalFields); + if (nonPushable.size() > 0) { // update filter with remaining non-pushable conditions return new FilterExec(filterExec.source(), plan, Predicates.combineAnd(nonPushable)); + } else { + // prune Filter entirely + return plan; } - } + } // else: nothing changes return filterExec; } - static List combineEligiblePushableToRange(List pushable) { + private static List combineEligiblePushableToRange(List pushable) { List bcs = new ArrayList<>(); List ranges = new ArrayList<>(); List others = new ArrayList<>(); @@ -185,8 +156,6 @@ static List combineEligiblePushableToRange(List pushable } else { others.add(e); } - } else if (e instanceof Range r) { - ranges.add(r); } else { others.add(e); } @@ -246,22 +215,4 @@ else if ((other instanceof GreaterThan || other instanceof GreaterThanOrEqual) } return changed ? CollectionUtils.combine(others, bcs, ranges) : pushable; } - - private static Expression timestampFieldForTimeSeries(PhysicalPlan plan, EsQueryExec queryExec) { - if (queryExec.indexMode() != IndexMode.TIME_SERIES) { - return null; - } - Holder timestampHolder = new Holder<>(); - plan.forEachDown(TimeSeriesAggregateExec.class, ts -> { - for (NamedExpression agg : ts.aggregates()) { - if (Alias.unwrap(agg) instanceof TimestampAware timestampAware) { - if (timestampAware.timestamp() != null) { - timestampHolder.set(timestampAware.timestamp()); - return; - } - } - } - }); - return timestampHolder.get(); - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/TimeSeriesSourcePartitioner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/TimeSeriesSourcePartitioner.java deleted file mode 100644 index 55e7072959792..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/TimeSeriesSourcePartitioner.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; - -import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.xpack.esql.core.expression.Expression; -import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; -import org.elasticsearch.xpack.esql.core.expression.Literal; -import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; -import org.elasticsearch.xpack.esql.core.querydsl.query.Query; -import org.elasticsearch.xpack.esql.core.type.DataType; -import org.elasticsearch.xpack.esql.core.util.Queries; -import org.elasticsearch.xpack.esql.expression.predicate.Predicates; -import org.elasticsearch.xpack.esql.expression.predicate.Range; -import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals; -import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; -import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThan; -import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThanOrEqual; -import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan; -import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThanOrEqual; -import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; -import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; - -import java.util.ArrayList; -import java.util.List; - -import static java.util.Arrays.asList; -import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER; - -/** - * Partitions time-series data source queries into multiple queries based on time intervals. - * The ideal number of slices should be close to the number of available processors. - * Too many slices may lead to high overhead; too few slices may cause under-utilization of CPUs - * and require high memory to buffer data points for rate calculations. - * Prefer partitioning the query interval into small, fixed, and predefined intervals to improve cache hit rate. - */ -final class TimeSeriesSourcePartitioner { - - static EsQueryExec partitionTimeSeriesSource( - LocalPhysicalOptimizerContext ctx, - Expression timestampField, - LucenePushdownPredicates pushdownPredicates, - EsQueryExec queryExec, - List pushable - ) { - // TODO: support date_nanos - if (timestampField == null || timestampField.dataType() != DataType.DATETIME) { - return null; - } - if (pushdownPredicates.isPushableFieldAttribute(timestampField) == false) { - return null; - } - Object minTimestampFromData = ctx.searchStats().min(new FieldAttribute.FieldName(MetadataAttribute.TIMESTAMP_FIELD)); - Object maxTimestampFromData = ctx.searchStats().max(new FieldAttribute.FieldName(MetadataAttribute.TIMESTAMP_FIELD)); - if (minTimestampFromData instanceof Long minTs && maxTimestampFromData instanceof Long maxTs) { - maxTs++; // maxTs from data is inclusive, make it exclusive - long dataInterval = maxTs - minTs; - Tuple minMaxFromPredicates = minMaxTimestampFromQuery(timestampField, pushable); - minTs = Math.max(minTs, minMaxFromPredicates.v1()); - maxTs = Math.min(maxTs, minMaxFromPredicates.v2()); - var filters = partitionFiltersByTimeIntervals(ctx, timestampField, dataInterval, minTs, maxTs); - if (filters.isEmpty() || filters.size() == 1) { - return null; - } - return pushDownFiltersAndPartitionFilters( - pushdownPredicates, - queryExec, - removeTimestampFilters(timestampField, pushable), - filters - ); - } - return null; - } - - private static EsQueryExec pushDownFiltersAndPartitionFilters( - LucenePushdownPredicates pushdownPredicates, - EsQueryExec queryExec, - List pushable, - List partitionFilters - ) { - List queryAndTags = new ArrayList<>(); - QueryBuilder mainQuery = queryExec.query(); - for (Range partition : partitionFilters) { - var newPushable = PushFiltersToSource.combineEligiblePushableToRange(CollectionUtils.appendToCopy(pushable, partition)); - Query queryDSL = TRANSLATOR_HANDLER.asQuery(pushdownPredicates, Predicates.combineAnd(newPushable)); - QueryBuilder planQuery = queryDSL.toQueryBuilder(); - QueryBuilder partitionQuery = Queries.combine(Queries.Clause.FILTER, asList(mainQuery, planQuery)); - queryAndTags.add(new EsQueryExec.QueryBuilderAndTags(partitionQuery, List.of())); - } - return new EsQueryExec( - queryExec.source(), - queryExec.indexPattern(), - queryExec.indexMode(), - queryExec.output(), - queryExec.limit(), - queryExec.sorts(), - queryExec.estimatedRowSize(), - queryAndTags - ); - } - - private static List removeTimestampFilters(Expression timestampField, List pushable) { - List removed = new ArrayList<>(pushable.size()); - for (Expression expr : pushable) { - if (expr instanceof EsqlBinaryComparison bin - && bin.right().foldable() - && bin.right() instanceof Literal l - && l.value() instanceof Long - && bin.left().dataType() == timestampField.dataType() - && bin.left().semanticEquals(timestampField)) { - continue; - } - if (expr instanceof Range r && r.value().dataType() == timestampField.dataType() && r.value().semanticEquals(timestampField)) { - continue; - } - removed.add(expr); - } - return removed; - } - - private static List partitionFiltersByTimeIntervals( - LocalPhysicalOptimizerContext ctx, - Expression timestampField, - long dataInterval, - long queryStartTs, - long queryEndTs - ) { - long maxRateBufferBytes = ctx.plannerSettings().getRateBufferSize().getBytes(); - int taskConcurrency = ctx.configuration().pragmas().taskConcurrency(); - long queryInterval = queryEndTs - queryStartTs; - // The number of data points should be based on the number of docs with the counter field, not the total docs. - // Here we estimate that by assuming the counter field appears in 10% of documents on average. - // TODO: access the TSDB codec for the accurate estimate - long totalDocs = ctx.searchStats().count() / 10; - long queryDocs = totalDocs * queryInterval / dataInterval; - long selectedInterval = -1; - long selectedSlices = -1; - // TODO: retrieve this from planner settings instead - final int availableProcessors = Math.ceilDiv(taskConcurrency * 2, 3); - for (int i = TIME_SERIES_PARTITION_INTERVALS.length - 1; i >= 0; i--) { - long sliceInterval = TIME_SERIES_PARTITION_INTERVALS[i]; - long numSlices = Math.ceilDiv(queryInterval, sliceInterval); - long docPerSlice = queryDocs / numSlices; - long requiredBytes = docPerSlice * 2 * Long.BYTES * Math.min(availableProcessors, numSlices); - if (requiredBytes <= maxRateBufferBytes) { - if (selectedInterval == -1 - || numSlices <= availableProcessors - || (selectedSlices < availableProcessors && numSlices <= taskConcurrency)) { - selectedInterval = sliceInterval; - selectedSlices = numSlices; - } else { - break; - } - } - } - if (selectedInterval == -1) { - return List.of(); - } - List filters = new ArrayList<>(); - // round the intervals to improve caching - var pt = (queryStartTs + selectedInterval - 1) / selectedInterval * selectedInterval; - if (queryStartTs < pt) { - filters.add( - new Range( - timestampField.source(), - timestampField, - new Literal(timestampField.source(), queryStartTs, timestampField.dataType()), - true, - new Literal(timestampField.source(), pt, timestampField.dataType()), - false, - ctx.configuration().zoneId() - ) - ); - } - while (pt < queryEndTs) { - long nextPt = Math.min(pt + selectedInterval, queryEndTs); - filters.add( - new Range( - timestampField.source(), - timestampField, - new Literal(timestampField.source(), pt, timestampField.dataType()), - true, - new Literal(timestampField.source(), nextPt, timestampField.dataType()), - false, - ctx.configuration().zoneId() - ) - ); - pt = nextPt; - } - return filters; - } - - private static Tuple minMaxTimestampFromQuery(Expression timestampField, List expressions) { - long minTimestamp = Long.MIN_VALUE; - long maxTimestamp = Long.MAX_VALUE; - for (Expression expr : expressions) { - if (expr instanceof EsqlBinaryComparison bin - && bin.right().foldable() - && bin.right() instanceof Literal l - && l.value() instanceof Long v - && bin.left().dataType() == timestampField.dataType() - && bin.left().semanticEquals(timestampField)) { - switch (expr) { - case Equals unused -> { - minTimestamp = Math.max(minTimestamp, v); - maxTimestamp = Math.min(maxTimestamp, v + 1); - } - case GreaterThan unused -> minTimestamp = Math.max(minTimestamp, v + 1); - case GreaterThanOrEqual unused -> minTimestamp = Math.max(minTimestamp, v); - case LessThan unused -> maxTimestamp = Math.min(maxTimestamp, v); - case LessThanOrEqual unused -> maxTimestamp = Math.min(maxTimestamp, v + 1); - default -> { - } - } - } - if (expr instanceof Range r && r.value().dataType() == timestampField.dataType() && r.value().semanticEquals(timestampField)) { - if (r.lower() instanceof Literal lowerLit && lowerLit.value() instanceof Long lowerVal) { - if (r.includeLower()) { - minTimestamp = Math.max(minTimestamp, lowerVal); - } else { - minTimestamp = Math.max(minTimestamp, lowerVal + 1); - } - } - if (r.upper() instanceof Literal upperLit && upperLit.value() instanceof Long upperVal) { - if (r.includeUpper()) { - maxTimestamp = Math.min(maxTimestamp, upperVal + 1); - } else { - maxTimestamp = Math.min(maxTimestamp, upperVal); - } - } - } - } - return Tuple.tuple(minTimestamp, maxTimestamp); - } - - static final long[] TIME_SERIES_PARTITION_INTERVALS = new long[] { - TimeValue.timeValueSeconds(1).millis(), - TimeValue.timeValueSeconds(2).millis(), - TimeValue.timeValueSeconds(5).millis(), - TimeValue.timeValueSeconds(10).millis(), - TimeValue.timeValueSeconds(20).millis(), - TimeValue.timeValueSeconds(30).millis(), - - TimeValue.timeValueMinutes(1).millis(), - TimeValue.timeValueMinutes(2).millis(), - TimeValue.timeValueMinutes(3).millis(), - TimeValue.timeValueMinutes(5).millis(), - TimeValue.timeValueMinutes(10).millis(), - TimeValue.timeValueMinutes(15).millis(), - TimeValue.timeValueMinutes(20).millis(), - TimeValue.timeValueMinutes(30).millis(), - - TimeValue.timeValueHours(1).millis(), - TimeValue.timeValueHours(2).millis(), - TimeValue.timeValueHours(3).millis(), - TimeValue.timeValueHours(5).millis(), - TimeValue.timeValueHours(12).millis(), - - TimeValue.timeValueDays(1).millis(), - TimeValue.timeValueDays(7).millis() }; -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java index 51b358590b418..426dd032df91c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java @@ -290,11 +290,8 @@ public List queryBuilderAndTags() { } public boolean canSubstituteRoundToWithQueryBuilderAndTags() { - boolean queryWithoutTag = queryBuilderAndTags == null - || queryBuilderAndTags.isEmpty() - || (queryBuilderAndTags.size() == 1 && queryBuilderAndTags.getFirst().tags.isEmpty()); // LuceneTopNSourceOperator doesn't support QueryAndTags - return queryWithoutTag && (sorts == null || sorts.isEmpty()); + return sorts == null || sorts.isEmpty(); } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java index 18747a311b7e2..7225399aa14b3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerSettings.java @@ -37,15 +37,6 @@ public class PlannerSettings { Setting.Property.Dynamic ); - public static final Setting RATE_BUFFER_SIZE = new Setting<>("esql.rate_buffer_size", settings -> { - long oneTenth = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() / 10; - return ByteSizeValue.ofBytes(Math.max(oneTenth, ByteSizeValue.ofMb(1).getBytes())).getStringRep(); - }, - s -> MemorySizeValue.parseBytesSizeValueOrHeapRatio(s, "esql.rate_buffer_size"), - Setting.Property.NodeScope, - Setting.Property.Dynamic - ); - public static final Setting LUCENE_TOPN_LIMIT = Setting.intSetting( "esql.lucene_topn_limit", IndexSettings.MAX_RESULT_WINDOW_SETTING.getDefault(Settings.EMPTY), @@ -70,7 +61,6 @@ public class PlannerSettings { private volatile DataPartitioning defaultDataPartitioning; private volatile ByteSizeValue valuesLoadingJumboSize; - private volatile ByteSizeValue rateBufferSize; private volatile int luceneTopNLimit; private volatile ByteSizeValue intermediateLocalRelationMaxSize; @@ -81,7 +71,6 @@ public PlannerSettings(ClusterService clusterService) { var clusterSettings = clusterService.getClusterSettings(); clusterSettings.initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v); clusterSettings.initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v); - clusterSettings.initializeAndWatch(RATE_BUFFER_SIZE, v -> this.rateBufferSize = v); clusterSettings.initializeAndWatch(LUCENE_TOPN_LIMIT, v -> this.luceneTopNLimit = v); clusterSettings.initializeAndWatch(INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, v -> this.intermediateLocalRelationMaxSize = v); } @@ -92,13 +81,11 @@ public PlannerSettings(ClusterService clusterService) { public PlannerSettings( DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize, - ByteSizeValue rateBufferSize, int luceneTopNLimit, ByteSizeValue intermediateLocalRelationMaxSize ) { this.defaultDataPartitioning = defaultDataPartitioning; this.valuesLoadingJumboSize = valuesLoadingJumboSize; - this.rateBufferSize = rateBufferSize; this.luceneTopNLimit = luceneTopNLimit; this.intermediateLocalRelationMaxSize = intermediateLocalRelationMaxSize; } @@ -132,11 +119,4 @@ public int luceneTopNLimit() { public ByteSizeValue intermediateLocalRelationMaxSize() { return intermediateLocalRelationMaxSize; } - - /** - * Returns the memory size allocated for buffering data points during rate calculations. - */ - public ByteSizeValue getRateBufferSize() { - return rateBufferSize; - } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 388dd0d48a137..85635a33c8fa1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -255,7 +255,6 @@ public List> getSettings() { ESQL_QUERYLOG_INCLUDE_USER_SETTING, PlannerSettings.DEFAULT_DATA_PARTITIONING, PlannerSettings.VALUES_LOADING_JUMBO_SIZE, - PlannerSettings.RATE_BUFFER_SIZE, PlannerSettings.LUCENE_TOPN_LIMIT, PlannerSettings.INTERMEDIATE_LOCAL_RELATION_MAX_SIZE, PlannerSettings.REDUCTION_LATE_MATERIALIZATION, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java index ae423443f6150..5a3a421660683 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java @@ -16,7 +16,6 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; -import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.planner.mapper.Mapper; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; @@ -77,10 +76,6 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) { } private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, EsqlFlags esqlFlags) { - return optimizedPlan(plan, TEST_PLANNER_SETTINGS, searchStats, esqlFlags); - } - - public PhysicalPlan optimizedPlan(PhysicalPlan plan, PlannerSettings plannerSettings, SearchStats searchStats, EsqlFlags esqlFlags) { // System.out.println("* Physical Before\n" + plan); var physicalPlan = EstimatesRowSize.estimateRowSize(0, physicalPlanOptimizer.optimize(plan)); // System.out.println("* Physical After\n" + physicalPlan); @@ -92,7 +87,7 @@ public PhysicalPlan optimizedPlan(PhysicalPlan plan, PlannerSettings plannerSett new LocalLogicalOptimizerContext(config, FoldContext.small(), searchStats) ); var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer( - new LocalPhysicalOptimizerContext(plannerSettings, esqlFlags, config, FoldContext.small(), searchStats), + new LocalPhysicalOptimizerContext(TEST_PLANNER_SETTINGS, esqlFlags, config, FoldContext.small(), searchStats), true ); var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer, null); @@ -104,7 +99,7 @@ public PhysicalPlan optimizedPlan(PhysicalPlan plan, PlannerSettings plannerSett return l; } - public PhysicalPlan physicalPlan(String query, Analyzer analyzer) { + private PhysicalPlan physicalPlan(String query, Analyzer analyzer) { LogicalPlan logical = logicalOptimizer.optimize(analyzer.analyze(EsqlParser.INSTANCE.parseQuery(query))); // System.out.println("Logical\n" + logical); return mapper.map(new Versioned<>(logical, analyzer.context().minimumVersion())); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PartitionTimeSeriesTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PartitionTimeSeriesTests.java deleted file mode 100644 index 20ac3c3a5a65d..0000000000000 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PartitionTimeSeriesTests.java +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; - -import org.elasticsearch.TransportVersion; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.time.DateFormatter; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.compute.lucene.DataPartitioning; -import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexMode; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.mapper.DateFieldMapper; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.ExistsQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.RangeQueryBuilder; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.esql.EsqlTestUtils; -import org.elasticsearch.xpack.esql.core.expression.FoldContext; -import org.elasticsearch.xpack.esql.optimizer.AbstractLocalPhysicalPlanOptimizerTests; -import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext; -import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer; -import org.elasticsearch.xpack.esql.optimizer.TestPlannerOptimizer; -import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; -import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; -import org.elasticsearch.xpack.esql.planner.PlannerSettings; -import org.elasticsearch.xpack.esql.planner.PlannerUtils; -import org.elasticsearch.xpack.esql.plugin.EsqlFlags; -import org.elasticsearch.xpack.esql.plugin.QueryPragmas; -import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; -import org.elasticsearch.xpack.esql.session.Configuration; -import org.elasticsearch.xpack.esql.stats.SearchStats; - -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; - -import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; -import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; - -public class PartitionTimeSeriesTests extends AbstractLocalPhysicalPlanOptimizerTests { - final DateFormatter fmt = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; - - public PartitionTimeSeriesTests(String name, Configuration config) { - super(name, config); - } - - private void runTest( - long totalDocs, - String minTimestampFromData, - String maxTimestampFromData, - String filter, - ByteSizeValue rateBufferSize, - int taskConcurrency, - List expectedRanges - ) { - Map minValue = Map.of("@timestamp", fmt.parseMillis(minTimestampFromData)); - Map maxValue = Map.of("@timestamp", fmt.parseMillis(maxTimestampFromData)); - SearchStats searchStats = new EsqlTestUtils.TestSearchStatsWithMinMax(minValue, maxValue) { - @Override - public Map targetShards() { - var indexMetadata = IndexMetadata.builder("k8s") - .settings( - ESTestCase.indexSettings(IndexVersion.current(), 1, 1) - .put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES.name()) - ) - .build(); - return Map.of(new ShardId(new Index("id", "n/a"), 1), indexMetadata); - } - - @Override - public long count() { - return totalDocs; - } - }; - var plannerSettings = new PlannerSettings( - DataPartitioning.SHARD, - ByteSizeValue.ofMb(1), - rateBufferSize, - 10_000, - ByteSizeValue.ofMb(1) - ); - Configuration config = configuration( - new QueryPragmas( - Settings.builder().put(QueryPragmas.TASK_CONCURRENCY.getKey().toLowerCase(Locale.ROOT), taskConcurrency).build() - ) - ); - var planner = new TestPlannerOptimizer( - config, - timeSeriesAnalyzer, - new LogicalPlanOptimizer(new LogicalOptimizerContext(config, FoldContext.small(), TransportVersion.current())) - ); - String q = String.format(Locale.ROOT, """ - TS k8s - %s - | STATS max(rate(network.total_bytes_in)) BY cluster, BUCKET(@timestamp, 1 hour) - | LIMIT 10 - """, (filter != null ? "| WHERE " + filter : "")); - PhysicalPlan plan = planner.physicalPlan(q, timeSeriesAnalyzer); - plan = PlannerUtils.integrateEsFilterIntoFragment(plan, null); - plan = planner.optimizedPlan(plan, plannerSettings, searchStats, new EsqlFlags(true)); - EsQueryExec esQuery = (EsQueryExec) plan.collectFirstChildren(EsQueryExec.class::isInstance).getFirst(); - List actualRanges = new ArrayList<>(); - for (EsQueryExec.QueryBuilderAndTags qt : esQuery.queryBuilderAndTags()) { - BoolQueryBuilder boolQuery = (BoolQueryBuilder) qt.query(); - List filters = boolQuery.must(); - assertThat(filters, hasSize(2)); - assertThat(filters.get(0), instanceOf(ExistsQueryBuilder.class)); - SingleValueQuery.Builder singleValueQuery = as(filters.get(1), SingleValueQuery.Builder.class); - RangeQueryBuilder r = as(singleValueQuery.next(), RangeQueryBuilder.class); - assertThat(r.fieldName(), equalTo("@timestamp")); - assertTrue(r.includeLower()); - assertFalse(r.includeUpper()); - actualRanges.add(new QueryRange(r.from().toString(), r.to().toString())); - } - assertThat(actualRanges, equalTo(expectedRanges)); - } - - record QueryRange(String fromInclusive, String toInclusive) { - - } - - public void testPartition() { - runTest( - 20_000_000L, - "2023-10-20T12:15:03.360Z", - "2023-10-20T12:59:02.250Z", - null, - ByteSizeValue.ofMb(75), - 12, // 8 cpus - List.of( - new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:20:00.000Z"), - new QueryRange("2023-10-20T12:20:00.000Z", "2023-10-20T12:25:00.000Z"), - new QueryRange("2023-10-20T12:25:00.000Z", "2023-10-20T12:30:00.000Z"), - new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:35:00.000Z"), - new QueryRange("2023-10-20T12:35:00.000Z", "2023-10-20T12:40:00.000Z"), - new QueryRange("2023-10-20T12:40:00.000Z", "2023-10-20T12:45:00.000Z"), - new QueryRange("2023-10-20T12:45:00.000Z", "2023-10-20T12:50:00.000Z"), - new QueryRange("2023-10-20T12:50:00.000Z", "2023-10-20T12:55:00.000Z"), - new QueryRange("2023-10-20T12:55:00.000Z", "2023-10-20T12:59:02.251Z") - ) - ); - runTest( - 20_000_000L, - "2023-10-20T12:15:03.360Z", - "2023-10-20T12:59:02.250Z", - null, - ByteSizeValue.ofMb(75), - 6, // 4 cpus - List.of( - new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:20:00.000Z"), - new QueryRange("2023-10-20T12:20:00.000Z", "2023-10-20T12:30:00.000Z"), - new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:40:00.000Z"), - new QueryRange("2023-10-20T12:40:00.000Z", "2023-10-20T12:50:00.000Z"), - new QueryRange("2023-10-20T12:50:00.000Z", "2023-10-20T12:59:02.251Z") - ) - ); - - runTest( - 20_000_000L, - "2023-10-20T12:15:03.360Z", - "2023-10-20T12:59:02.250Z", - null, - ByteSizeValue.ofMb(75), - 3, // 2 cpus - List.of( - new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:30:00.000Z"), - new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:59:02.251Z") - ) - ); - - runTest( - 20_000_000L, - "2023-10-20T12:15:03.360Z", - "2023-10-20T12:59:02.250Z", - null, - ByteSizeValue.ofMb(30), // smaller buffer to more smaller slices - 3, // 2 cpus - List.of( - new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:20:00.000Z"), - new QueryRange("2023-10-20T12:20:00.000Z", "2023-10-20T12:40:00.000Z"), - new QueryRange("2023-10-20T12:40:00.000Z", "2023-10-20T12:59:02.251Z") - ) - ); - runTest( - 20_000_000L, - "2023-10-20T12:15:03.360Z", - "2023-10-20T12:59:02.250Z", - null, - ByteSizeValue.ofMb(10), // smaller buffer to more smaller slices - 3, // 2 cpus - List.of( - new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:20:00.000Z"), - new QueryRange("2023-10-20T12:20:00.000Z", "2023-10-20T12:25:00.000Z"), - new QueryRange("2023-10-20T12:25:00.000Z", "2023-10-20T12:30:00.000Z"), - new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:35:00.000Z"), - new QueryRange("2023-10-20T12:35:00.000Z", "2023-10-20T12:40:00.000Z"), - new QueryRange("2023-10-20T12:40:00.000Z", "2023-10-20T12:45:00.000Z"), - new QueryRange("2023-10-20T12:45:00.000Z", "2023-10-20T12:50:00.000Z"), - new QueryRange("2023-10-20T12:50:00.000Z", "2023-10-20T12:55:00.000Z"), - new QueryRange("2023-10-20T12:55:00.000Z", "2023-10-20T12:59:02.251Z") - ) - ); - runTest( - 5_000_000L, // less docs - "2023-10-20T12:15:03.360Z", - "2023-10-20T12:59:02.250Z", - null, - ByteSizeValue.ofMb(10), // smaller buffer to more smaller slices - 3, // 2 cpus - List.of( - new QueryRange("2023-10-20T12:15:03.360Z", "2023-10-20T12:30:00.000Z"), - new QueryRange("2023-10-20T12:30:00.000Z", "2023-10-20T12:59:02.251Z") - ) - ); - } - - public void testPartitionWithQuery() { - runTest( - 20_000_000L, - "2023-11-20T12:15:03.360Z", - "2023-11-20T12:59:02.250Z", - null, - ByteSizeValue.ofMb(75), - 12, // 8 cpus - List.of( - new QueryRange("2023-11-20T12:15:03.360Z", "2023-11-20T12:20:00.000Z"), - new QueryRange("2023-11-20T12:20:00.000Z", "2023-11-20T12:25:00.000Z"), - new QueryRange("2023-11-20T12:25:00.000Z", "2023-11-20T12:30:00.000Z"), - new QueryRange("2023-11-20T12:30:00.000Z", "2023-11-20T12:35:00.000Z"), - new QueryRange("2023-11-20T12:35:00.000Z", "2023-11-20T12:40:00.000Z"), - new QueryRange("2023-11-20T12:40:00.000Z", "2023-11-20T12:45:00.000Z"), - new QueryRange("2023-11-20T12:45:00.000Z", "2023-11-20T12:50:00.000Z"), - new QueryRange("2023-11-20T12:50:00.000Z", "2023-11-20T12:55:00.000Z"), - new QueryRange("2023-11-20T12:55:00.000Z", "2023-11-20T12:59:02.251Z") - ) - ); - - String filter = randomFrom( - "\"2023-11-20T12:16:03.360Z\" <= @timestamp AND @timestamp <= \"2023-11-20T12:57:02.250Z\"", - "TRANGE(\"2023-11-20T12:16:03.359Z\", \"2023-11-20T12:57:02.250Z\")" - ); - runTest( - 20_000_000L, - "2023-11-20T12:15:03.360Z", - "2023-11-20T12:59:02.250Z", - filter, - ByteSizeValue.ofMb(75), - 12, // 8 cpus - List.of( - new QueryRange("2023-11-20T12:16:03.360Z", "2023-11-20T12:20:00.000Z"), - new QueryRange("2023-11-20T12:20:00.000Z", "2023-11-20T12:25:00.000Z"), - new QueryRange("2023-11-20T12:25:00.000Z", "2023-11-20T12:30:00.000Z"), - new QueryRange("2023-11-20T12:30:00.000Z", "2023-11-20T12:35:00.000Z"), - new QueryRange("2023-11-20T12:35:00.000Z", "2023-11-20T12:40:00.000Z"), - new QueryRange("2023-11-20T12:40:00.000Z", "2023-11-20T12:45:00.000Z"), - new QueryRange("2023-11-20T12:45:00.000Z", "2023-11-20T12:50:00.000Z"), - new QueryRange("2023-11-20T12:50:00.000Z", "2023-11-20T12:55:00.000Z"), - new QueryRange("2023-11-20T12:55:00.000Z", "2023-11-20T12:57:02.251Z") - ) - ); - } -} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java index eb3c09c22ff59..caaeeabb3eecd 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SubstituteRoundToTests.java @@ -1006,6 +1006,17 @@ public Map targetShards() { return Map.of(new ShardId(new Index("id", "n/a"), 1), indexMetadata); } }; + // enable filter-by-filter for rate aggregations + { + String q = """ + TS k8s + | STATS max(rate(network.total_bytes_in)) BY cluster, BUCKET(@timestamp, 1 hour) + | LIMIT 10 + """; + PhysicalPlan plan = plannerOptimizerTimeSeries.plan(q, searchStats, timeSeriesAnalyzer); + int queryAndTags = plainQueryAndTags(plan); + assertThat(queryAndTags, equalTo(4)); + } // disable filter-by-filter for non-rate aggregations { String q = """ diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index ac3fed8126cf7..d86d6747e2981 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -278,13 +278,7 @@ public void testTimeSeries() throws IOException { 10, null ); - PlannerSettings plannerSettings = new PlannerSettings( - DataPartitioning.AUTO, - ByteSizeValue.ofMb(1), - ByteSizeValue.ofMb(2), - 10_000, - ByteSizeValue.ofMb(1) - ); + PlannerSettings plannerSettings = new PlannerSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1), 10_000, ByteSizeValue.ofMb(1)); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( "test", FoldContext.small(),