Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

groupByTags Performance improvements + fix setting consolidator per group #1165

Merged
merged 5 commits into from
Dec 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions expr/func_alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (s *FuncAlias) Exec(cache map[Req][]models.Series) ([]models.Series, error)
for i := range series {
series[i].Target = s.alias
series[i].QueryPatt = s.alias
series[i].Tags["name"] = s.alias
}
return series, nil
}
3 changes: 3 additions & 0 deletions expr/func_alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func testAlias(name string, in []models.Series, out []models.Series, t *testing.
if o.Target != g.Target {
t.Fatalf("case %q: expected target %q, got %q", name, o.Target, g.Target)
}
if o.Target != g.Tags["name"] {
t.Fatalf("case %q: expected target to match name tag but target = %q, tag = %q", name, o.Target, g.Tags["name"])
}
if len(o.Datapoints) != len(g.Datapoints) {
t.Fatalf("case %q: len output expected %d, got %d", name, len(o.Datapoints), len(g.Datapoints))
}
Expand Down
1 change: 1 addition & 0 deletions expr/func_aliasbynode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (s *FuncAliasByNode) Exec(cache map[Req][]models.Series) ([]models.Series,
n := aggKey(serie, s.nodes)
series[i].Target = n
series[i].QueryPatt = n
series[i].Tags["name"] = n
}
return series, nil
}
1 change: 1 addition & 0 deletions expr/func_aliassub.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (s *FuncAliasSub) Exec(cache map[Req][]models.Series) ([]models.Series, err
name := s.search.ReplaceAllString(metric, replace)
series[i].Target = name
series[i].QueryPatt = name
series[i].Tags["name"] = name
}
return series, err
}
3 changes: 3 additions & 0 deletions expr/func_aliassub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func TestAliasSub(t *testing.T) {
if o != g.Target {
t.Fatalf("case %d: expected target %q, got %q", i, o, g.Target)
}
if o != g.Tags["name"] {
t.Fatalf("case %d: expected name tag %q, got %q", i, o, g.Tags["name"])
}
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions expr/func_groupbytags.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"errors"
"sort"
"strings"

"github.com/grafana/metrictank/api/models"
"github.com/raintank/schema"
Expand Down Expand Up @@ -38,6 +37,10 @@ func (s *FuncGroupByTags) Exec(cache map[Req][]models.Series) ([]models.Series,
return nil, err
}

if len(series) == 0 {
return series, nil
}

if len(s.tags) == 0 {
return nil, errors.New("No tags specified")
}
Expand All @@ -59,7 +62,7 @@ func (s *FuncGroupByTags) Exec(cache map[Req][]models.Series) ([]models.Series,
if !useName {
// if all series have the same name, name becomes one of our tags
for _, serie := range series {
thisName := strings.Split(serie.Target, ";")[0]
thisName := serie.Tags["name"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right on.
it is probably not clear, but it looks like we do try to assure the name tag is always set for any series (FuncGet sets it initially and any other function that creates new series should set it as well I think)

if nameReplace == "" {
nameReplace = thisName
} else if nameReplace != thisName {
Expand All @@ -75,12 +78,10 @@ func (s *FuncGroupByTags) Exec(cache map[Req][]models.Series) ([]models.Series,
// First pass - group our series together by key
var buffer bytes.Buffer
for _, serie := range series {
name := strings.SplitN(serie.Target, ";", 2)[0]

buffer.Reset()

if useName {
buffer.WriteString(name)
buffer.WriteString(serie.Tags["name"])
} else {
buffer.WriteString(nameReplace)
}
Expand All @@ -106,7 +107,8 @@ func (s *FuncGroupByTags) Exec(cache map[Req][]models.Series) ([]models.Series,

// Now, for each key perform the requested aggregation
for name, groupSeries := range groups {
cons, queryCons := summarizeCons(series)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems incorrect. it's possible for input series to have different consolidators defined, or am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but I think that's true with many of the aggregation functions and it seems first come first serve.

In general, I think grouping here would be very awkward with mixing consolidators and the user is likely doing something...odd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the code in master also seems wrong.
seems to me the most correct way is for each group to do cons, queryCons := summarizeCons(groupSeries)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is more correct, though in the case that the groups are mixed, there is still an arbitrary consolidator chosen. But when 'name' is one of the groupByTags, I could this working better

cons, queryCons := summarizeCons(groupSeries)

newSeries := models.Series{
Target: name,
QueryPatt: name,
Expand All @@ -117,6 +119,7 @@ func (s *FuncGroupByTags) Exec(cache map[Req][]models.Series) ([]models.Series,
newSeries.SetTags()

newSeries.Datapoints = pointSlicePool.Get().([]schema.Point)

aggFunc(groupSeries, &newSeries.Datapoints)
cache[Req{}] = append(cache[Req{}], newSeries)

Expand Down
140 changes: 101 additions & 39 deletions expr/func_groupbytags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestGroupByTagsSingleGroupByName(t *testing.T) {
getModel("name1", sumabc),
}

testGroupByTags("MultipleSeriesMultipleResultsMultipleNamesMoreTags", in, out, "sum", []string{"name"}, nil, t)
testGroupByTags("SingleGroupByName", in, out, "sum", []string{"name"}, nil, t)
}

func TestGroupByTagsMultipleGroupByName(t *testing.T) {
Expand All @@ -148,7 +148,7 @@ func TestGroupByTagsMultipleGroupByName(t *testing.T) {
getModel("name2", sumcd),
}

testGroupByTags("MultipleSeriesMultipleResultsMultipleNamesMoreTags", in, out, "sum", []string{"name"}, nil, t)
testGroupByTags("MultipleGroupByName", in, out, "sum", []string{"name"}, nil, t)
}

func TestGroupByTagsMultipleSeriesMissingTag(t *testing.T) {
Expand All @@ -163,7 +163,7 @@ func TestGroupByTagsMultipleSeriesMissingTag(t *testing.T) {
getModel("name2;missingTag=;tag1=val1_1", sumcd),
}

testGroupByTags("MultipleSeriesMultipleResultsGroupByName", in, out, "sum", []string{"tag1", "name", "missingTag"}, nil, t)
testGroupByTags("MultipleSeriesMissingTag", in, out, "sum", []string{"tag1", "name", "missingTag"}, nil, t)
}

func TestGroupByTagsAllAggregators(t *testing.T) {
Expand Down Expand Up @@ -257,62 +257,114 @@ func testGroupByTags(name string, in []models.Series, out []models.Series, agg s
}
}

func BenchmarkGroupByTags10k_1NoNulls(b *testing.B) {
benchmarkGroupByTags(b, 1, test.RandFloats10k, test.RandFloats10k)
// Benchmarks:

// input series: 1, 10, 100, 1k, 10k, 100k
// output series: 1, same as input, then if applicable: 10, 100, 1k, 10k

// 1 input series
func BenchmarkGroupByTags1in1out(b *testing.B) {
benchmarkGroupByTags(b, 1, 1)
}

// 10 input Series
func BenchmarkGroupByTags10in1out(b *testing.B) {
benchmarkGroupByTags(b, 10, 1)
}

func BenchmarkGroupByTags10in10out(b *testing.B) {
benchmarkGroupByTags(b, 10, 10)
}

// 100 input series
func BenchmarkGroupByTags100in1out(b *testing.B) {
benchmarkGroupByTags(b, 100, 1)
}

func BenchmarkGroupByTags100in10out(b *testing.B) {
benchmarkGroupByTags(b, 100, 10)
}

func BenchmarkGroupByTags100in100out(b *testing.B) {
benchmarkGroupByTags(b, 100, 100)
}

// 1k input series
func BenchmarkGroupByTags1000in1out(b *testing.B) {
benchmarkGroupByTags(b, 1000, 1)
}

func BenchmarkGroupByTags1000in10out(b *testing.B) {
benchmarkGroupByTags(b, 1000, 10)
}

func BenchmarkGroupByTags1000in100out(b *testing.B) {
benchmarkGroupByTags(b, 1000, 100)
}

func BenchmarkGroupByTags1000in1000out(b *testing.B) {
benchmarkGroupByTags(b, 1000, 1000)
}
func BenchmarkGroupByTags10k_10NoNulls(b *testing.B) {
benchmarkGroupByTags(b, 10, test.RandFloats10k, test.RandFloats10k)

// 10k input series
func BenchmarkGroupByTags10000in1out(b *testing.B) {
benchmarkGroupByTags(b, 10000, 1)
}
func BenchmarkGroupByTags10k_100NoNulls(b *testing.B) {
benchmarkGroupByTags(b, 100, test.RandFloats10k, test.RandFloats10k)

func BenchmarkGroupByTags10000in10out(b *testing.B) {
benchmarkGroupByTags(b, 10000, 10)
}
func BenchmarkGroupByTags10k_1000NoNulls(b *testing.B) {
benchmarkGroupByTags(b, 1000, test.RandFloats10k, test.RandFloats10k)

func BenchmarkGroupByTags10000in100out(b *testing.B) {
benchmarkGroupByTags(b, 10000, 100)
}

func BenchmarkGroupByTags10k_1SomeSeriesHalfNulls(b *testing.B) {
benchmarkGroupByTags(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
func BenchmarkGroupByTags10000in1000out(b *testing.B) {
benchmarkGroupByTags(b, 10000, 1000)
}
func BenchmarkGroupByTags10k_10SomeSeriesHalfNulls(b *testing.B) {
benchmarkGroupByTags(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)

func BenchmarkGroupByTags10000in10000out(b *testing.B) {
benchmarkGroupByTags(b, 10000, 10000)
}
func BenchmarkGroupByTags10k_100SomeSeriesHalfNulls(b *testing.B) {
benchmarkGroupByTags(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)

// 100k input series
func BenchmarkGroupByTags100000in1out(b *testing.B) {
benchmarkGroupByTags(b, 100000, 1)
}
func BenchmarkGroupByTags10k_1000SomeSeriesHalfNulls(b *testing.B) {
benchmarkGroupByTags(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)

func BenchmarkGroupByTags100000in10out(b *testing.B) {
benchmarkGroupByTags(b, 100000, 10)
}

func BenchmarkGroupByTags10k_1AllSeriesHalfNulls(b *testing.B) {
benchmarkGroupByTags(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
func BenchmarkGroupByTags100000in100out(b *testing.B) {
benchmarkGroupByTags(b, 100000, 100)
}
func BenchmarkGroupByTags10k_10AllSeriesHalfNulls(b *testing.B) {
benchmarkGroupByTags(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)

func BenchmarkGroupByTags100000in1000out(b *testing.B) {
benchmarkGroupByTags(b, 100000, 1000)
}
func BenchmarkGroupByTags10k_100AllSeriesHalfNulls(b *testing.B) {
benchmarkGroupByTags(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)

func BenchmarkGroupByTags100000in10000out(b *testing.B) {
benchmarkGroupByTags(b, 100000, 10000)
}
func BenchmarkGroupByTags10k_1000AllSeriesHalfNulls(b *testing.B) {
benchmarkGroupByTags(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)

func BenchmarkGroupByTags100000in100000out(b *testing.B) {
benchmarkGroupByTags(b, 100000, 100000)
}

func benchmarkGroupByTags(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
func benchmarkGroupByTags(b *testing.B, numInputSeries, numOutputSeries int) {
var input []models.Series
tagValues := []string{"tag1", "tag2", "tag3", "tag4"}
for i := 0; i < numSeries; i++ {
tags := make(map[string]string, len(tagValues))

for t, tag := range tagValues {
tags[tag] = strconv.Itoa(t)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm tags was never used and the compiler never complained? that's strange

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was "used" in that it was populated. It was never used after that though. Enough to satisfy the compiler!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, i've run into this a few times before :/

for i := 0; i < numInputSeries; i++ {
series := models.Series{
Target: strconv.Itoa(i),
}
if i%1 == 0 {
series.Datapoints = fn0()
} else {
series.Datapoints = fn1()

for _, tag := range tagValues {
series.Target += ";" + tag + "=" + strconv.Itoa(i%numOutputSeries)
}

series.Datapoints = test.RandFloats100()
input = append(input, series)
}
b.ResetTimer()
Expand All @@ -327,6 +379,16 @@ func benchmarkGroupByTags(b *testing.B, numSeries int, fn0, fn1 func() []schema.
if err != nil {
b.Fatalf("%s", err)
}

if len(results) != numOutputSeries {
b.Fatalf("Expected %d groups, got %d", numOutputSeries, len(results))
}

if true {
for _, serie := range results {
pointSlicePool.Put(serie.Datapoints[:0])
}
}
}
b.SetBytes(int64(numSeries * len(results[0].Datapoints) * 12))
b.SetBytes(int64(numInputSeries * len(results[0].Datapoints) * 12))
}
2 changes: 2 additions & 0 deletions test/points.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
var randFloats = make(map[int][]schema.Point)
var randFloatsWithNulls = make(map[int][]schema.Point)

func RandFloats100() []schema.Point { return RandFloats(100) }
func RandFloats10k() []schema.Point { return RandFloats(10000) }
func RandFloats1M() []schema.Point { return RandFloats(1000000) }

Expand All @@ -30,6 +31,7 @@ func RandFloats(size int) []schema.Point {
return out
}

func RandFloatsWithNulls100() []schema.Point { return RandFloatsWithNulls(100) }
func RandFloatsWithNulls10k() []schema.Point { return RandFloatsWithNulls(10000) }
func RandFloatsWithNulls1M() []schema.Point { return RandFloatsWithNulls(1000000) }

Expand Down