Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1764 from grafana/aggregate2
Browse files Browse the repository at this point in the history
add function aggregate. PR #1751 + small tweaks
  • Loading branch information
Dieterbe authored Apr 13, 2020
2 parents 082486a + 6dfc29e commit 6c5fac7
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 42 deletions.
2 changes: 1 addition & 1 deletion docs/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ See also:
| Function name and signature | Alias | Metrictank |
| -------------------------------------------------------------- | ------------ | ---------- |
| absolute | | Stable |
| aggregate | | No |
| aggregate | | Stable |
| aggregateLine | | No |
| aggregateWithWildcards | | No |
| alias(seriesList, alias) seriesList | | Stable |
Expand Down
54 changes: 54 additions & 0 deletions expr/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ var a = []schema.Point{
{Val: 1234567890, Ts: 60},
}

var avgZeroa = []schema.Point{
{Val: 0, Ts: 10},
{Val: 0, Ts: 20},
{Val: 5.5, Ts: 30},
{Val: 0, Ts: 40},
{Val: 0, Ts: 50},
{Val: 1234567890, Ts: 60},
}

var b = []schema.Point{
{Val: 0, Ts: 10},
{Val: math.MaxFloat64, Ts: 20},
Expand Down Expand Up @@ -116,6 +125,15 @@ var avgab = []schema.Point{
{Val: 1234567890, Ts: 60},
}

var avgZeroab = []schema.Point{
{Val: 0, Ts: 10},
{Val: math.MaxFloat64 / 2, Ts: 20},
{Val: (math.MaxFloat64 - 14.5) / 2, Ts: 30},
{Val: 0, Ts: 40},
{Val: 617283945, Ts: 50},
{Val: 617283945, Ts: 60},
}

var avgabc = []schema.Point{
{Val: 0, Ts: 10},
{Val: math.MaxFloat64 / 3, Ts: 20},
Expand All @@ -125,6 +143,15 @@ var avgabc = []schema.Point{
{Val: float64(1234567894) / 2, Ts: 60},
}

var avgZeroabc = []schema.Point{
{Val: 0, Ts: 10},
{Val: math.MaxFloat64 / 3, Ts: 20},
{Val: (math.MaxFloat64 - 13.5) / 3, Ts: 30},
{Val: float64(2) / 3, Ts: 40},
{Val: float64(1234567893) / 3, Ts: 50},
{Val: float64(1234567894) / 3, Ts: 60},
}

var maxab = []schema.Point{
{Val: 0, Ts: 10},
{Val: math.MaxFloat64, Ts: 20},
Expand Down Expand Up @@ -251,6 +278,33 @@ var rangeabc = []schema.Point{
{Val: 1234567886, Ts: 60},
}

var counta = []schema.Point{
{Val: 1, Ts: 10},
{Val: 1, Ts: 20},
{Val: 1, Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: math.NaN(), Ts: 50},
{Val: 1, Ts: 60},
}

var countab = []schema.Point{
{Val: 2, Ts: 10},
{Val: 2, Ts: 20},
{Val: 2, Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: 1, Ts: 50},
{Val: 1, Ts: 60},
}

var countabc = []schema.Point{
{Val: 3, Ts: 10},
{Val: 3, Ts: 20},
{Val: 3, Ts: 30},
{Val: 1, Ts: 40},
{Val: 2, Ts: 50},
{Val: 2, Ts: 60},
}

// make sure we test with the correct data, don't mask if processing accidentally modifies our input data
func getCopy(in []schema.Point) []schema.Point {
out := make([]schema.Point, len(in))
Expand Down
59 changes: 48 additions & 11 deletions expr/func_aggregate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package expr

import (
"math"
"strings"
"unsafe"

Expand All @@ -9,21 +10,35 @@ import (
)

type FuncAggregate struct {
in []GraphiteFunc
agg seriesAggregator
in []GraphiteFunc
name string
xFilesFactor float64
}

// NewAggregateConstructor takes an agg string and returns a constructor function
func NewAggregateConstructor(aggDescription string, aggFunc crossSeriesAggFunc) func() GraphiteFunc {
func NewAggregateConstructor(name string) func() GraphiteFunc {
return func() GraphiteFunc {
return &FuncAggregate{agg: seriesAggregator{function: aggFunc, name: aggDescription}}
return &FuncAggregate{name: name}
}
}

func NewAggregate() GraphiteFunc {
return &FuncAggregate{}
}

func (s *FuncAggregate) Signature() ([]Arg, []Arg) {
return []Arg{
ArgSeriesLists{val: &s.in},
}, []Arg{ArgSeries{}}
if s.name == "" {
return []Arg{
ArgSeriesLists{val: &s.in},
ArgString{val: &s.name, validator: []Validator{IsAggFunc}, key: "func"},
ArgFloat{val: &s.xFilesFactor, opt: true, key: "xFilesFactor"},
}, []Arg{ArgSeries{}}
} else {
return []Arg{
ArgSeriesLists{val: &s.in},
}, []Arg{ArgSeries{}}
}

}

func (s *FuncAggregate) Context(context Context) Context {
Expand All @@ -41,15 +56,31 @@ func (s *FuncAggregate) Exec(dataMap DataMap) ([]models.Series, error) {
return series, nil
}

agg := seriesAggregator{function: getCrossSeriesAggFunc(s.name), name: s.name}
series = Normalize(dataMap, series)
return aggregate(dataMap, series, queryPatts, agg, s.xFilesFactor)
}

// aggregate aggregates series using the requested aggregator and xFilesFactor and returns an output slice of length 1.
func aggregate(dataMap DataMap, series []models.Series, queryPatts []string, agg seriesAggregator, xFilesFactor float64) ([]models.Series, error) {
if len(series) == 1 {
name := s.agg.name + "Series(" + series[0].QueryPatt + ")"
name := agg.name + "Series(" + series[0].QueryPatt + ")"
series[0].Target = name
series[0].QueryPatt = name
return series, nil
}
out := pointSlicePool.Get().([]schema.Point)
series = Normalize(dataMap, series)
s.agg.function(series, &out)

agg.function(series, &out)

//remove values in accordance to xFilesFactor
if !skipCrossSeriesXff(xFilesFactor) {
for i := 0; i < len(series[0].Datapoints); i++ {
if !crossSeriesXff(series, i, xFilesFactor) {
out[i].Val = math.NaN()
}
}
}

// The tags for the aggregated series is only the tags that are
// common to all input series
Expand All @@ -67,7 +98,13 @@ func (s *FuncAggregate) Exec(dataMap DataMap) ([]models.Series, error) {
}

cons, queryCons := summarizeCons(series)
name := s.agg.name + "Series(" + strings.Join(queryPatts, ",") + ")"
name := agg.name + "Series(" + strings.Join(queryPatts, ",") + ")"

commonTags["aggregatedBy"] = agg.name
if _, ok := commonTags["name"]; !ok {
commonTags["name"] = name
}

output := series[0]
output.Target = name
output.QueryPatt = name
Expand Down
90 changes: 87 additions & 3 deletions expr/func_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestAggregateIdentity(t *testing.T) {
},
getTargetSeries("averageSeries(single)", a),
t,
0,
)
testAggregate(
"identity",
Expand All @@ -32,6 +33,7 @@ func TestAggregateIdentity(t *testing.T) {
},
getTargetSeries("sumSeries(single)", a),
t,
0,
)
}
func TestAggregateQueryToSingle(t *testing.T) {
Expand All @@ -45,6 +47,7 @@ func TestAggregateQueryToSingle(t *testing.T) {
},
getTargetSeries("averageSeries(foo.*)", a),
t,
0,
)
}
func TestAggregateMultiple(t *testing.T) {
Expand All @@ -59,6 +62,7 @@ func TestAggregateMultiple(t *testing.T) {
},
getTargetSeries("averageSeries(foo.*)", avgab),
t,
0,
)
testAggregate(
"sum-multiple-series",
Expand All @@ -71,6 +75,7 @@ func TestAggregateMultiple(t *testing.T) {
},
getTargetSeries("sumSeries(foo.*)", sumab),
t,
0,
)
testAggregate(
"max-multiple-series",
Expand All @@ -83,6 +88,7 @@ func TestAggregateMultiple(t *testing.T) {
},
getTargetSeries("maxSeries(foo.*)", maxab),
t,
0,
)
}
func TestAggregateMultipleDiffQuery(t *testing.T) {
Expand All @@ -102,20 +108,23 @@ func TestAggregateMultipleDiffQuery(t *testing.T) {
input,
getTargetSeries("averageSeries(foo.*,movingAverage(bar, '1min'))", avgabc),
t,
0,
)
testAggregate(
"sum-multiple-serieslists",
"sum",
input,
getTargetSeries("sumSeries(foo.*,movingAverage(bar, '1min'))", sumabc),
t,
0,
)
testAggregate(
"max-multiple-serieslists",
"max",
input,
getTargetSeries("maxSeries(foo.*,movingAverage(bar, '1min'))", maxabc),
t,
0,
)
}

Expand Down Expand Up @@ -143,22 +152,97 @@ func TestAggregateMultipleTimesSameInput(t *testing.T) {
input,
getTargetSeries("averageSeries(foo.*,foo.*,a,a)", avg4a2b),
t,
0,
)
testAggregate(
"sum-multiple-times-same-input",
"sum",
input,
getTargetSeries("sumSeries(foo.*,foo.*,a,a)", sum4a2b),
t,
0,
)
}

func testAggregate(name, agg string, in [][]models.Series, out models.Series, t *testing.T) {
f := NewAggregateConstructor(agg, getCrossSeriesAggFunc(agg))()
func TestAggregateXFilesFactor(t *testing.T) {
input := [][]models.Series{
{
getQuerySeries("foo.*", a),
getQuerySeries("foo.*", b),
getQuerySeries("foo.*", c),
},
}

var avgabcxff05 = []schema.Point{
{Val: 0, Ts: 10},
{Val: math.MaxFloat64 / 3, Ts: 20},
{Val: (math.MaxFloat64 - 13.5) / 3, Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: float64(1234567893) / 2, Ts: 50},
{Val: float64(1234567894) / 2, Ts: 60},
}

var avgabcxff075 = []schema.Point{
{Val: 0, Ts: 10},
{Val: math.MaxFloat64 / 3, Ts: 20},
{Val: (math.MaxFloat64 - 13.5) / 3, Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: math.NaN() / 2, Ts: 50},
{Val: math.NaN() / 2, Ts: 60},
}

testAggregate(
"xFilesFactor-0",
"average",
input,
getTargetSeries("averageSeries(foo.*)", avgabc),
t,
0,
)
testAggregate(
"xFilesFactor-0.25",
"average",
input,
getTargetSeries("averageSeries(foo.*)", avgabc),
t,
0.25,
)

testAggregate(
"xFilesFactor-0.5",
"average",
input,
getTargetSeries("averageSeries(foo.*)", avgabcxff05),
t,
0.5,
)

testAggregate(
"xFilesFactor-0.75",
"average",
input,
getTargetSeries("averageSeries(foo.*)", avgabcxff075),
t,
0.75,
)

testAggregate(
"xFilesFactor-1",
"average",
input,
getTargetSeries("averageSeries(foo.*)", avgabcxff075),
t,
1,
)
}

func testAggregate(name, agg string, in [][]models.Series, out models.Series, t *testing.T, xFilesFactor float64) {
f := NewAggregateConstructor(agg)()
avg := f.(*FuncAggregate)
for _, i := range in {
avg.in = append(avg.in, NewMock(i))
}
avg.xFilesFactor = xFilesFactor
got, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
t.Fatalf("case %q: err should be nil. got %q", name, err)
Expand Down Expand Up @@ -237,7 +321,7 @@ func benchmarkAggregate(b *testing.B, numSeries int, fn0, fn1 func() []schema.Po
b.ResetTimer()
var err error
for i := 0; i < b.N; i++ {
f := NewAggregateConstructor("average", crossSeriesAvg)()
f := NewAggregateConstructor("average")()
avg := f.(*FuncAggregate)
avg.in = append(avg.in, NewMock(input))
results, err = f.Exec(make(map[Req][]models.Series))
Expand Down
Loading

0 comments on commit 6c5fac7

Please sign in to comment.