Skip to content

Commit 2229243

Browse files
authored
feat(engine): rate() aggregate function (#19589)
1 parent 4d34826 commit 2229243

File tree

3 files changed

+71
-18
lines changed

3 files changed

+71
-18
lines changed

pkg/engine/internal/planner/logical/planner.go

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -259,15 +259,45 @@ func walkRangeAggregation(e *syntax.RangeAggregationExpr, params logql.Params) (
259259
builder = builder.Cast(unwrapIdentifier, unwrapOperation)
260260
}
261261

262-
rangeAggType := convertRangeAggregationType(e.Operation)
263-
if rangeAggType == types.RangeAggregationTypeInvalid {
262+
var rangeAggType types.RangeAggregationType
263+
switch e.Operation {
264+
case syntax.OpRangeTypeCount:
265+
rangeAggType = types.RangeAggregationTypeCount
266+
case syntax.OpRangeTypeSum:
267+
rangeAggType = types.RangeAggregationTypeSum
268+
//case syntax.OpRangeTypeMax:
269+
// rangeAggType = types.RangeAggregationTypeMax
270+
//case syntax.OpRangeTypeMin:
271+
// rangeAggType = types.RangeAggregationTypeMin
272+
//case syntax.OpRangeTypeBytesRate:
273+
// rangeAggType = types.RangeAggregationTypeBytes // bytes_rate is implemented as bytes_over_time/$interval
274+
case syntax.OpRangeTypeRate:
275+
if e.Left.Unwrap != nil {
276+
rangeAggType = types.RangeAggregationTypeSum // rate of an unwrap is implemented as sum_over_time/$interval
277+
} else {
278+
rangeAggType = types.RangeAggregationTypeCount // rate is implemented as count_over_time/$interval
279+
}
280+
default:
264281
return nil, errUnimplemented
265282
}
266283

267284
builder = builder.RangeAggregation(
268285
nil, rangeAggType, params.Start(), params.End(), params.Step(), rangeInterval,
269286
)
270287

288+
switch e.Operation {
289+
//case syntax.OpRangeTypeBytesRate:
290+
// // bytes_rate is implemented as bytes_over_time/$interval
291+
// builder = builder.BinOpRight(types.BinaryOpDiv, &Literal{
292+
// Literal: NewLiteral(rangeInterval.Seconds()),
293+
// })
294+
case syntax.OpRangeTypeRate:
295+
// rate is implemented as count_over_time/$interval
296+
builder = builder.BinOpRight(types.BinaryOpDiv, &Literal{
297+
Literal: NewLiteral(rangeInterval.Seconds()),
298+
})
299+
}
300+
271301
return builder.Value(), nil
272302
}
273303

@@ -426,21 +456,6 @@ func convertVectorAggregationType(op string) types.VectorAggregationType {
426456
}
427457
}
428458

429-
func convertRangeAggregationType(op string) types.RangeAggregationType {
430-
switch op {
431-
case syntax.OpRangeTypeCount:
432-
return types.RangeAggregationTypeCount
433-
case syntax.OpRangeTypeSum:
434-
return types.RangeAggregationTypeSum
435-
//case syntax.OpRangeTypeMax:
436-
// return types.RangeAggregationTypeMax
437-
//case syntax.OpRangeTypeMin:
438-
// return types.RangeAggregationTypeMin
439-
default:
440-
return types.RangeAggregationTypeInvalid
441-
}
442-
}
443-
444459
func convertMatcherType(t labels.MatchType) types.BinaryOp {
445460
switch t {
446461
case labels.MatchEqual:

pkg/engine/internal/planner/logical/planner_test.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,41 @@ RETURN %12
193193

194194
t.Logf("\n%s\n", sb.String())
195195
})
196+
197+
t.Run(`rate metric query with nested math expression`, func(t *testing.T) {
198+
q := &query{
199+
statement: `sum by (level) ((rate({cluster="prod"}[5m]) - 100) ^ 2)`,
200+
start: 3600,
201+
end: 7200,
202+
interval: 5 * time.Minute,
203+
}
204+
205+
logicalPlan, err := BuildPlan(q)
206+
require.NoError(t, err)
207+
t.Logf("\n%s\n", logicalPlan.String())
208+
209+
expected := `%1 = EQ label.cluster "prod"
210+
%2 = MAKETABLE [selector=%1, predicates=[], shard=0_of_1]
211+
%3 = GTE builtin.timestamp 1970-01-01T00:55:00Z
212+
%4 = SELECT %2 [predicate=%3]
213+
%5 = LT builtin.timestamp 1970-01-01T02:00:00Z
214+
%6 = SELECT %4 [predicate=%5]
215+
%7 = RANGE_AGGREGATION %6 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
216+
%8 = DIV %7 300
217+
%9 = SUB %8 100
218+
%10 = POW %9 2
219+
%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)]
220+
%12 = LOGQL_COMPAT %11
221+
RETURN %12
222+
`
223+
224+
require.Equal(t, expected, logicalPlan.String())
225+
226+
var sb strings.Builder
227+
PrintTree(&sb, logicalPlan.Value())
228+
229+
t.Logf("\n%s\n", sb.String())
230+
})
196231
}
197232

198233
func TestCanExecuteQuery(t *testing.T) {
@@ -284,8 +319,8 @@ func TestCanExecuteQuery(t *testing.T) {
284319
expected: true,
285320
},
286321
{
287-
// rate is not supported
288322
statement: `sum by (level) (rate({env="prod"}[1m]))`,
323+
expected: true,
289324
},
290325
{
291326
// max is not supported

pkg/engine/internal/types/aggregations.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
RangeAggregationTypeSum // Represents sum_over_time range aggregation
1111
RangeAggregationTypeMax // Represents max_over_time range aggregation
1212
RangeAggregationTypeMin // Represents min_over_time range aggregation
13+
RangeAggregationTypeBytes // Represents bytes_over_time range aggregation
1314
)
1415

1516
func (op RangeAggregationType) String() string {
@@ -22,6 +23,8 @@ func (op RangeAggregationType) String() string {
2223
return "max"
2324
case RangeAggregationTypeMin:
2425
return "min"
26+
case RangeAggregationTypeBytes:
27+
return "bytes"
2528
default:
2629
return "invalid"
2730
}

0 commit comments

Comments
 (0)