Skip to content

Commit 3716d82

Browse files
authored
feat(promql): support prom nested function and aggregation push down (openGemini#588)
Signed-off-by: Jack Liu <[email protected]>
1 parent d62c032 commit 3716d82

File tree

11 files changed

+318
-61
lines changed

11 files changed

+318
-61
lines changed

engine/executor/heu_rule.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,9 @@ func (r *AggPushdownToExchangeRule) OnMatch(call *OptRuleCall) {
315315
return
316316
}
317317

318-
// the calculation of prom function with range vector selector is only pushed down to series, not exchange.
319-
if exchange.schema.Options().IsRangeVectorSelector() {
318+
// the calculation of prom function with range vector selector is only pushed down to series
319+
// however the agg in the prom nested call needs to be pushed down to the exchange.
320+
if exchange.schema.Options().IsRangeVectorSelector() && !exchange.Schema().HasPromNestedCall() {
320321
return
321322
}
322323

@@ -399,12 +400,13 @@ func (r *AggPushdownToReaderRule) OnMatch(call *OptRuleCall) {
399400
}
400401

401402
canSlidingWindowPushDown := reader.Schema().HasSlidingWindowCall() && sysconfig.GetEnableSlidingWindowPushUp() != sysconfig.OnSlidingWindowPushUp
402-
if !reader.Schema().CanCallsPushdown() || (!reader.Schema().HasPercentileOGSketch() && !canSlidingWindowPushDown) {
403+
if !reader.Schema().CanCallsPushdown() || (!reader.Schema().HasPercentileOGSketch() && !canSlidingWindowPushDown && !reader.Schema().Options().IsPromQuery()) {
403404
return
404405
}
405406

406-
// the calculation of prom function with range vector selector is only pushed down to series, not reader.
407-
if reader.schema.Options().IsRangeVectorSelector() {
407+
// the calculation of prom function with range vector selector is only pushed down to series
408+
// however the agg in the prom nested call needs to be pushed down to the reader.
409+
if reader.schema.Options().IsRangeVectorSelector() && !reader.Schema().HasPromNestedCall() {
408410
return
409411
}
410412

engine/executor/heu_rule_test.go

+53-19
Original file line numberDiff line numberDiff line change
@@ -1438,25 +1438,7 @@ func TestIncHashAggRule(t *testing.T) {
14381438
}
14391439
}
14401440

1441-
func TestAggPushDownWithRangeVector(t *testing.T) {
1442-
fields := influxql.Fields{
1443-
&influxql.Field{
1444-
Expr: &influxql.Call{
1445-
Name: "count",
1446-
Args: []influxql.Expr{
1447-
&influxql.VarRef{
1448-
Val: "value",
1449-
Type: influxql.Float,
1450-
},
1451-
},
1452-
},
1453-
},
1454-
}
1455-
columnsName := []string{"count"}
1456-
opt := query.ProcessorOptions{PromQuery: true, Range: time.Minute, Step: time.Second}
1457-
opt.Interval.Duration = 1000
1458-
1459-
schema := executor.NewQuerySchema(fields, columnsName, &opt, nil)
1441+
func testAggPushDownBase(t *testing.T, schema *executor.QuerySchema) *AggPushDownVerifier {
14601442
planBuilder := executor.NewLogicalPlanBuilderImpl(schema)
14611443

14621444
var plan hybridqp.QueryNode
@@ -1506,10 +1488,62 @@ func TestAggPushDownWithRangeVector(t *testing.T) {
15061488

15071489
verifier := NewAggPushDownVerifier()
15081490
hybridqp.WalkQueryNodeInPreOrder(verifier, best)
1491+
return verifier
1492+
}
1493+
1494+
func TestAggPushDownWithRangeVector(t *testing.T) {
1495+
fields := influxql.Fields{
1496+
&influxql.Field{
1497+
Expr: &influxql.Call{
1498+
Name: "count",
1499+
Args: []influxql.Expr{
1500+
&influxql.VarRef{
1501+
Val: "value",
1502+
Type: influxql.Float,
1503+
},
1504+
},
1505+
},
1506+
},
1507+
}
1508+
columnsName := []string{"count"}
1509+
opt := query.ProcessorOptions{PromQuery: true, Range: time.Minute, Step: time.Second}
1510+
opt.Interval.Duration = 1000
1511+
1512+
schema := executor.NewQuerySchema(fields, columnsName, &opt, nil)
1513+
verifier := testAggPushDownBase(t, schema)
15091514
if verifier.AggCount() != 4 && !executor.GetEnableFileCursor() {
15101515
t.Errorf("4 agg in plan tree, but %d", verifier.AggCount())
15111516
}
15121517
if verifier.AggCount() != 2 && executor.GetEnableFileCursor() {
15131518
t.Errorf("2 agg in plan tree, but %d", verifier.AggCount())
15141519
}
15151520
}
1521+
1522+
func TestAggPushDownWithPromNestedCall(t *testing.T) {
1523+
fields := influxql.Fields{
1524+
&influxql.Field{
1525+
Expr: &influxql.Call{
1526+
Name: "count",
1527+
Args: []influxql.Expr{
1528+
&influxql.Call{
1529+
Name: "rate_prom",
1530+
Args: []influxql.Expr{
1531+
&influxql.VarRef{Val: "value", Type: influxql.Float},
1532+
},
1533+
},
1534+
},
1535+
},
1536+
},
1537+
}
1538+
columnsName := []string{"count"}
1539+
opt := query.ProcessorOptions{PromQuery: true, Range: time.Minute, Step: time.Second}
1540+
opt.Interval.Duration = 1000
1541+
schema := executor.NewQuerySchema(fields, columnsName, &opt, nil)
1542+
verifier := testAggPushDownBase(t, schema)
1543+
if verifier.AggCount() != 4 && !executor.GetEnableFileCursor() {
1544+
t.Errorf("4 agg in plan tree, but %d", verifier.AggCount())
1545+
}
1546+
if verifier.AggCount() != 5 && executor.GetEnableFileCursor() {
1547+
t.Errorf("5 agg in plan tree, but %d", verifier.AggCount())
1548+
}
1549+
}

engine/executor/logic_plan.go

+42-1
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,7 @@ func (p *LogicalIncHashAgg) Digest() string {
553553
type LogicalAggregate struct {
554554
isCountDistinct bool
555555
isPercentileOGSketch bool
556+
isPromNestedCall bool
556557
aggType int
557558
calls map[string]*influxql.Call
558559
callsOrder []string
@@ -571,6 +572,7 @@ func NewLogicalAggregate(input hybridqp.QueryNode, schema hybridqp.Catalog) *Log
571572
callsOrder: make([]string, 0, len(schema.Calls())),
572573
isCountDistinct: false,
573574
isPercentileOGSketch: schema.HasPercentileOGSketch(),
575+
isPromNestedCall: schema.HasPromNestedCall(),
574576
LogicalPlanSingle: *NewLogicalPlanSingle(input, schema),
575577
}
576578

@@ -640,6 +642,26 @@ func (p *LogicalAggregate) inferAggLevel() AggLevel {
640642
return SourceLevel
641643
}
642644

645+
func (p *LogicalAggregate) inferPromCallLevel() AggLevel {
646+
_, ok := p.Children()[0].(*HeuVertex)
647+
if !ok {
648+
_, ok = p.Children()[0].(*LogicalSeries)
649+
if ok {
650+
return SourceLevel
651+
}
652+
return SinkLevel
653+
}
654+
_, ok = p.Children()[0].(*HeuVertex).node.(*LogicalExchange)
655+
if ok {
656+
return SinkLevel
657+
}
658+
_, ok = p.Children()[0].(*HeuVertex).node.(*LogicalSeries)
659+
if ok {
660+
return SourceLevel
661+
}
662+
return SinkLevel
663+
}
664+
643665
func (p *LogicalAggregate) getOGSketchOp(k string, level AggLevel, cc map[string]*hybridqp.OGSketchCompositeOperator) *influxql.Call {
644666
switch level {
645667
case SourceLevel:
@@ -653,6 +675,13 @@ func (p *LogicalAggregate) getOGSketchOp(k string, level AggLevel, cc map[string
653675
}
654676
}
655677

678+
func (p *LogicalAggregate) getPromNestedCall(level AggLevel, call *hybridqp.PromNestedCall) *influxql.Call {
679+
if level == SourceLevel {
680+
return call.GetFuncCall()
681+
}
682+
return call.GetAggCall()
683+
}
684+
656685
func (p *LogicalAggregate) DeriveOperations() {
657686
if p.isCountDistinct {
658687
p.initCountDistinct()
@@ -692,6 +721,11 @@ func (p *LogicalAggregate) init() {
692721
level = p.inferAggLevel()
693722
cc = p.schema.CompositeCall()
694723
}
724+
var pc map[string]*hybridqp.PromNestedCall
725+
if p.isPromNestedCall {
726+
level = p.inferPromCallLevel()
727+
pc = p.schema.PromNestedCall()
728+
}
695729
refs := p.inputs[0].RowDataType().MakeRefs()
696730

697731
m := make(map[string]influxql.VarRef)
@@ -706,6 +740,13 @@ func (p *LogicalAggregate) init() {
706740
if p.isPercentileOGSketch && c.Name == PercentileOGSketch {
707741
c = p.getOGSketchOp(k, level, cc)
708742
ref = p.schema.Mapping()[c]
743+
} else if p.isPromNestedCall && p.schema.IsPromNestedCall(c) {
744+
kc, ok := pc[k]
745+
if !ok {
746+
continue
747+
}
748+
c = p.getPromNestedCall(level, kc)
749+
ref = p.schema.Mapping()[c]
709750
} else {
710751
ref = p.schema.Mapping()[p.schema.Calls()[k]]
711752
}
@@ -747,7 +788,7 @@ func (p *LogicalAggregate) init() {
747788
}
748789

749790
func (p *LogicalAggregate) ForwardCallArgs() {
750-
if p.isPercentileOGSketch {
791+
if p.isPercentileOGSketch || p.isPromNestedCall {
751792
return
752793
}
753794
for k, call := range p.calls {

0 commit comments

Comments
 (0)