Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #970 from bloomberg/scaleToSeconds
Browse files Browse the repository at this point in the history
Add scaleToSeconds function
  • Loading branch information
Dieterbe authored Aug 7, 2018
2 parents 8d392cb + 624eb8d commit 2cb286b
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 25 deletions.
51 changes: 26 additions & 25 deletions docs/graphite.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,29 @@ See also:

Here are the currently included functions:

Function name and signature | Alias | Metrictank
----------------------------------------------------- | ------------ | ----------
alias(seriesList, alias) seriesList | | Stable
aliasByNode(seriesList, nodeList) seriesList | aliasByTags | Stable
aliasSub(seriesList, pattern, replacement) seriesList | | Stable
averageSeries(seriesLists) series | avg | Stable
consolidateBy(seriesList, func) seriesList | | Stable
diffSeries(seriesLists) series | | Stable
divideSeries(dividend, divisor) seriesList | | Stable
divideSeriesLists(dividends, divisors) seriesList | | Stable
exclude(seriesList, pattern) seriesList | | Stable
grep(seriesList, pattern) seriesList | | Stable
groupByTags(seriesList, func, tagList) seriesList | | Stable
isNonNull(seriesList) seriesList | | Stable
maxSeries(seriesList) series | max | Stable
minSeries(seriesList) series | min | Stable
multiplySeries(seriesList) series | | Stable
movingAverage(seriesLists, windowSize) seriesList | | Unstable
perSecond(seriesLists) seriesList | | Stable
rangeOfSeries(seriesList) series | | Stable
scale(seriesLists, num) series | | Stable
stddevSeries(seriesList) series | | Stable
sumSeries(seriesLists) series | sum | Stable
summarize(seriesList) seriesList | | Stable
transformNull(seriesList, default=0) seriesList | | Stable
| Function name and signature | Alias | Metrictank |
| ----------------------------------------------------- | ----------- | ---------- |
| alias(seriesList, alias) seriesList | | Stable |
| aliasByNode(seriesList, nodeList) seriesList | aliasByTags | Stable |
| aliasSub(seriesList, pattern, replacement) seriesList | | Stable |
| averageSeries(seriesLists) series | avg | Stable |
| consolidateBy(seriesList, func) seriesList | | Stable |
| diffSeries(seriesLists) series | | Stable |
| divideSeries(dividend, divisor) seriesList | | Stable |
| divideSeriesLists(dividends, divisors) seriesList | | Stable |
| exclude(seriesList, pattern) seriesList | | Stable |
| grep(seriesList, pattern) seriesList | | Stable |
| groupByTags(seriesList, func, tagList) seriesList | | Stable |
| isNonNull(seriesList) seriesList | | Stable |
| maxSeries(seriesList) series | max | Stable |
| minSeries(seriesList) series | min | Stable |
| multiplySeries(seriesList) series | | Stable |
| movingAverage(seriesLists, windowSize) seriesList | | Unstable |
| perSecond(seriesLists) seriesList | | Stable |
| rangeOfSeries(seriesList) series | | Stable |
| scale(seriesList, num) series | | Stable |
| scaleToSeconds(seriesList, seconds) series | | Stable |
| stddevSeries(seriesList) series | | Stable |
| sumSeries(seriesLists) series | sum | Stable |
| summarize(seriesList) seriesList | | Stable |
| transformNull(seriesList, default=0) seriesList | | Stable |
66 changes: 66 additions & 0 deletions expr/func_scaletoseconds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package expr

import (
"fmt"
"math"
"strconv"

"github.com/grafana/metrictank/api/models"
schema "gopkg.in/raintank/schema.v1"
)

type FuncScaleToSeconds struct {
in GraphiteFunc
seconds float64
}

func NewScaleToSeconds() GraphiteFunc {
return &FuncScaleToSeconds{}
}

func (s *FuncScaleToSeconds) Signature() ([]Arg, []Arg) {
return []Arg{
ArgSeriesList{val: &s.in},
ArgFloat{key: "seconds", val: &s.seconds},
}, []Arg{ArgSeriesList{}}
}

func (s *FuncScaleToSeconds) Context(context Context) Context {
return context
}

func (s *FuncScaleToSeconds) Exec(cache map[Req][]models.Series) ([]models.Series, error) {
series, err := s.in.Exec(cache)
if err != nil {
return nil, err
}

out := make([]models.Series, len(series))
for i, serie := range series {
transformed := &out[i]
transformed.Target = fmt.Sprintf("scaleToSeconds(%s,%d)", serie.Target, int64(s.seconds))
transformed.QueryPatt = transformed.Target
transformed.Tags = make(map[string]string, len(serie.Tags)+1)
transformed.Datapoints = pointSlicePool.Get().([]schema.Point)
transformed.Interval = serie.Interval
transformed.Consolidator = serie.Consolidator
transformed.QueryCons = serie.QueryCons

for k, v := range serie.Tags {
transformed.Tags[k] = v
}
transformed.Tags["scaleToSeconds"] = strconv.FormatFloat(s.seconds, 'g', -1, 64)

factor := float64(s.seconds) / float64(serie.Interval)
for _, p := range serie.Datapoints {
if !math.IsNaN(p.Val) {
// round to 6 decimal places to mimic graphite
roundingFactor := math.Pow(10, 6)
p.Val = math.Round(p.Val*factor*roundingFactor) / roundingFactor
}
transformed.Datapoints = append(transformed.Datapoints, p)
}
}
cache[Req{}] = append(cache[Req{}], out...)
return out, nil
}
207 changes: 207 additions & 0 deletions expr/func_scaletoseconds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package expr

import (
"math"
"strconv"
"testing"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/test"
"gopkg.in/raintank/schema.v1"
)

func TestScaleToSecondsSingle(t *testing.T) {
testScaleToSeconds(
"identity",
[]models.Series{
{
Interval: 10,
QueryPatt: "a",
Target: "a",
Datapoints: getCopy(a),
},
},
[]models.Series{
{
Interval: 10,
QueryPatt: "scaleToSeconds(a,10)",
Datapoints: getCopy(a),
},
},
t,
10,
)
}

func TestScaleToSecondsSingleAllNonNull(t *testing.T) {
out := []schema.Point{
{Val: 0, Ts: 10},
{Val: 3.0437127721620759e+19, Ts: 20},
{Val: 1.8354510353341003e+20, Ts: 30},
{Val: 2.674777890687885e+19, Ts: 40},
{Val: 7.3786976294838198e+19, Ts: 50},
{Val: 2.3058430092136936e+20, Ts: 60},
}

testScaleToSeconds(
"identity-largeseconds",
[]models.Series{
{
Interval: 10,
QueryPatt: "d",
Target: "d",
Datapoints: getCopy(d),
},
},
[]models.Series{
{
Interval: 10,
QueryPatt: "scaleToSeconds(d,9223372036854774784)",
Datapoints: out,
},
},
t,
9223372036854774784,
)
}

func TestScaleToSecondsMulti(t *testing.T) {
out1 := []schema.Point{
{Val: 0, Ts: 10},
{Val: math.Inf(0), Ts: 20},
{Val: math.Inf(0), Ts: 30},
{Val: math.NaN(), Ts: 40},
{Val: 123456.7890, Ts: 50},
{Val: math.NaN(), Ts: 60},
}
out2 := []schema.Point{
{Val: 0, Ts: 10},
{Val: 0, Ts: 20},
{Val: 0.0001, Ts: 30},
{Val: 0.0002, Ts: 40},
{Val: 0.0003, Ts: 50},
{Val: 0.0004, Ts: 60},
}
testScaleToSeconds(
"multiple-series-subseconds",
[]models.Series{
{
Interval: 10,
QueryPatt: "b.*",
Target: "b.*",
Datapoints: getCopy(b),
},
{
Interval: 10,
QueryPatt: "c.foo{bar,baz}",
Target: "c.foo{bar,baz}",
Datapoints: getCopy(c),
},
},
[]models.Series{
{
QueryPatt: "scaleToSeconds(b.*,0)",
Datapoints: out1,
},
{
QueryPatt: "scaleToSeconds(c.foo{bar,baz},0)",
Datapoints: out2,
},
},
t,
0.001,
)
}

func testScaleToSeconds(name string, in []models.Series, out []models.Series, t *testing.T, seconds float64) {
f := NewScaleToSeconds()
f.(*FuncScaleToSeconds).in = NewMock(in)
f.(*FuncScaleToSeconds).seconds = seconds
gots, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
t.Fatalf("case %q (%f): err should be nil. got %q", name, seconds, err)
}
if len(gots) != len(out) {
t.Fatalf("case %q (%f): isNonNull len output expected %d, got %d", name, seconds, len(out), len(gots))
}
for i, g := range gots {
exp := out[i]
if g.QueryPatt != exp.QueryPatt {
t.Fatalf("case %q (%f): expected target %q, got %q", name, seconds, exp.QueryPatt, g.QueryPatt)
}
if len(g.Datapoints) != len(exp.Datapoints) {
t.Fatalf("case %q (%f) len output expected %d, got %d", name, seconds, len(exp.Datapoints), len(g.Datapoints))
}
for j, p := range g.Datapoints {
bothNaN := math.IsNaN(p.Val) && math.IsNaN(exp.Datapoints[j].Val)
if (bothNaN || p.Val == exp.Datapoints[j].Val) && p.Ts == exp.Datapoints[j].Ts {
continue
}
t.Fatalf("case %q (%f): output point %d - expected %v got %v", name, seconds, j, exp.Datapoints[j], p)
}
}
}

func BenchmarkScaleToSeconds10k_1NoNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 1, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkScaleToSeconds10k_10NoNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 10, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkScaleToSeconds10k_100NoNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 100, test.RandFloats10k, test.RandFloats10k)
}
func BenchmarkScaleToSeconds10k_1000NoNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 1000, test.RandFloats10k, test.RandFloats10k)
}

func BenchmarkScaleToSeconds10k_1SomeSeriesHalfNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 1, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkScaleToSeconds10k_10SomeSeriesHalfNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 10, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkScaleToSeconds10k_100SomeSeriesHalfNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 100, test.RandFloats10k, test.RandFloatsWithNulls10k)
}
func BenchmarkScaleToSeconds10k_1000SomeSeriesHalfNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 1000, test.RandFloats10k, test.RandFloatsWithNulls10k)
}

func BenchmarkScaleToSeconds10k_1AllSeriesHalfNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 1, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkScaleToSeconds10k_10AllSeriesHalfNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 10, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkScaleToSeconds10k_100AllSeriesHalfNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 100, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}
func BenchmarkScaleToSeconds10k_1000AllSeriesHalfNulls(b *testing.B) {
benchmarkScaleToSeconds(b, 1000, test.RandFloatsWithNulls10k, test.RandFloatsWithNulls10k)
}

func benchmarkScaleToSeconds(b *testing.B, numSeries int, fn0, fn1 func() []schema.Point) {
var input []models.Series
for i := 0; i < numSeries; i++ {
series := models.Series{
QueryPatt: strconv.Itoa(i),
}
if i%2 == 0 {
series.Datapoints = fn0()
} else {
series.Datapoints = fn1()
}
input = append(input, series)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
f := NewScaleToSeconds()
f.(*FuncScaleToSeconds).in = NewMock(input)
got, err := f.Exec(make(map[Req][]models.Series))
if err != nil {
b.Fatalf("%s", err)
}
results = got
}
}
1 change: 1 addition & 0 deletions expr/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func init() {
"perSecond": {NewPerSecond, true},
"rangeOfSeries": {NewAggregateConstructor("rangeOf", crossSeriesRange), true},
"scale": {NewScale, true},
"scaleToSeconds": {NewScaleToSeconds, true},
"smartSummarize": {NewSmartSummarize, false},
"sortByName": {NewSortByName, true},
"stddevSeries": {NewAggregateConstructor("stddev", crossSeriesStddev), true},
Expand Down

0 comments on commit 2cb286b

Please sign in to comment.