Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Run plan.Clean() upon error too + refactor pointslicepool #1858

Merged
merged 4 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,10 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {
// the requested range is too narrow for the requested interval
return []schema.Point{}
}
// try to get a sufficiently sized slice from the pool. if it fails, allocate a new one.
var out []schema.Point

neededCap := int((last-first)/interval + 1)
candidate := pointSlicePool.Get().([]schema.Point)
if cap(candidate) >= neededCap {
out = candidate[:neededCap]
} else {
pointSlicePool.Put(candidate)
out = make([]schema.Point, neededCap)
}
out := pointSlicePool.GetMin(neededCap)
out = out[:neededCap]

// i iterates in. o iterates out. t is the ts we're looking to fill.
for t, i, o := first, 0, -1; t <= last; t += interval {
Expand Down Expand Up @@ -123,7 +117,7 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {
o -= 1
}
}
pointSlicePool.Put(in[:0])
pointSlicePool.Put(in)

return out
}
Expand All @@ -149,7 +143,7 @@ func divide(pointsA, pointsB []schema.Point) []schema.Point {
for i := range pointsA {
pointsA[i].Val /= pointsB[i].Val
}
pointSlicePool.Put(pointsB[:0])
pointSlicePool.Put(pointsB)
return pointsA
}

Expand Down Expand Up @@ -484,7 +478,7 @@ func (s *Server) getSeries(ctx *requestContext, ss *models.StorageStats) (mdata.
func (s *Server) itersToPoints(ctx *requestContext, iters []tsz.Iter) []schema.Point {
pre := time.Now()

points := pointSlicePool.Get().([]schema.Point)
points := pointSlicePool.Get()
for _, iter := range iters {
total := 0
good := 0
Expand Down
4 changes: 2 additions & 2 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
execCtx, execSpan := tracing.NewSpan(ctx.Req.Context(), s.Tracer, "executePlan")
defer execSpan.Finish()
out, meta, err := s.executePlan(execCtx, ctx.OrgId, &plan)
defer plan.Clean()
if err != nil {
err := response.WrapError(err)
if err.HTTPStatusCode() == http.StatusBadRequest && !request.NoProxy && proxyBadRequests {
Expand Down Expand Up @@ -317,7 +318,6 @@ func (s *Server) renderMetrics(ctx *middleware.Context, request models.GraphiteR
response.Write(ctx, response.NewFastJson(200, models.SeriesByTarget(out)))
}
}
plan.Clean()
}

func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFind) {
Expand Down Expand Up @@ -897,7 +897,7 @@ func (s *Server) executePlan(ctx context.Context, orgId uint32, plan *expr.Plan)
New: func(in models.Series) {
},
Done: func(in models.Series) {
pointSlicePool.Put(in.Datapoints[:0])
pointSlicePool.Put(in.Datapoints)
},
})

Expand Down
20 changes: 4 additions & 16 deletions api/init.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,13 @@
package api

import (
"sync"

"github.com/grafana/metrictank/expr"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/pointslicepool"
)

// default size is probably bigger than what most responses need, but it saves [re]allocations
// also it's possible that occasionnally more size is needed, causing a realloc of underlying array, and that extra space will stick around until next GC run.
const defaultPointSliceSize = 2000

var pointSlicePool sync.Pool

func pointSlicePoolAllocNew() interface{} {
return make([]schema.Point, 0, defaultPointSliceSize)
}
var pointSlicePool *pointslicepool.PointSlicePool

func init() {
pointSlicePool = sync.Pool{
New: pointSlicePoolAllocNew,
}
expr.Pool(&pointSlicePool)
pointSlicePool = pointslicepool.New(pointslicepool.DefaultPointSliceSize)
expr.Pool(pointSlicePool)
}
2 changes: 1 addition & 1 deletion expr/datamap.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (dm DataMap) Add(r Req, s ...models.Series) {
func (dm DataMap) Clean() {
for _, series := range dm {
for _, serie := range series {
pointSlicePool.Put(serie.Datapoints[:0])
pointSlicePool.Put(serie.Datapoints)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions expr/func_absolute.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncAbsolute struct {
Expand Down Expand Up @@ -36,7 +35,7 @@ func (s *FuncAbsolute) Exec(dataMap DataMap) ([]models.Series, error) {
serie.Target = fmt.Sprintf("absolute(%s)", serie.Target)
serie.Tags = serie.CopyTagsWith("absolute", "1")
serie.QueryPatt = fmt.Sprintf("absolute(%s)", serie.QueryPatt)
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
Copy link
Collaborator

@shanson7 shanson7 Jul 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of functions know how many datapoints they need and could use GetMin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true, but i don't have time now to go over all those

for _, p := range serie.Datapoints {
p.Val = math.Abs(p.Val)
out = append(out, p)
Expand Down
3 changes: 1 addition & 2 deletions expr/func_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"unsafe"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncAggregate struct {
Expand Down Expand Up @@ -69,7 +68,7 @@ func aggregate(dataMap DataMap, series []models.Series, queryPatts []string, agg
series[0].QueryPatt = name
return series, nil
}
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

agg.function(series, &out)

Expand Down
12 changes: 6 additions & 6 deletions expr/func_aspercent.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *FuncAsPercent) execWithNodes(in, totals []models.Series, dataMap DataMa
nonesSerie.Tags = map[string]string{"name": nonesSerie.Target}

if nones == nil {
nones = pointSlicePool.Get().([]schema.Point)
nones = pointSlicePool.Get()
for _, p := range totalSerieByKey[key].Datapoints {
p.Val = math.NaN()
nones = append(nones, p)
Expand All @@ -144,7 +144,7 @@ func (s *FuncAsPercent) execWithNodes(in, totals []models.Series, dataMap DataMa
nonesSerie.Meta = serie1.Meta.Copy()

if nones == nil {
nones = pointSlicePool.Get().([]schema.Point)
nones = pointSlicePool.Get()
for _, p := range serie1.Datapoints {
p.Val = math.NaN()
nones = append(nones, p)
Expand All @@ -157,7 +157,7 @@ func (s *FuncAsPercent) execWithNodes(in, totals []models.Series, dataMap DataMa
} else {
// key found in both inByKey and totalSerieByKey
serie1, serie2 := NormalizeTwo(serie1, totalSerieByKey[key], NewCOWCycler(dataMap))
serie1 = serie1.Copy(pointSlicePool.Get().([]schema.Point))
serie1 = serie1.Copy(pointSlicePool.Get())
serie1.QueryPatt = fmt.Sprintf("asPercent(%s,%s)", serie1.QueryPatt, serie2.QueryPatt)
serie1.Target = fmt.Sprintf("asPercent(%s,%s)", serie1.Target, serie2.Target)
serie1.Tags = map[string]string{"name": serie1.Target}
Expand Down Expand Up @@ -216,12 +216,12 @@ func (s *FuncAsPercent) execWithoutNodes(in, totals []models.Series, dataMap Dat
}
if len(totalsSerie.Datapoints) > 0 {
serie, totalsSerie = NormalizeTwo(serie, totalsSerie, NewCOWCycler(dataMap))
serie = serie.Copy(pointSlicePool.Get().([]schema.Point))
serie = serie.Copy(pointSlicePool.Get())
for i := range serie.Datapoints {
serie.Datapoints[i].Val = computeAsPercent(serie.Datapoints[i].Val, totalsSerie.Datapoints[i].Val)
}
} else {
serie = serie.Copy(pointSlicePool.Get().([]schema.Point))
serie = serie.Copy(pointSlicePool.Get())
for i := range serie.Datapoints {
serie.Datapoints[i].Val = computeAsPercent(serie.Datapoints[i].Val, s.totalFloat)
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func sumSeries(in []models.Series, dataMap DataMap) models.Series {
if len(in) == 1 {
return in[0]
}
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
crossSeriesSum(in, &out)
var queryPatts []string
var meta models.SeriesMeta
Expand Down
2 changes: 1 addition & 1 deletion expr/func_constantline.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *FuncConstantLine) Context(context Context) Context {
}

func (s *FuncConstantLine) Exec(dataMap DataMap) ([]models.Series, error) {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

out = append(out, schema.Point{Val: s.value, Ts: s.first})
diff := s.last - s.first
Expand Down
3 changes: 1 addition & 2 deletions expr/func_countseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"strings"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncCountSeries struct {
Expand Down Expand Up @@ -38,7 +37,7 @@ func (s *FuncCountSeries) Exec(dataMap DataMap) ([]models.Series, error) {

cons, queryCons := summarizeCons(series)
name := fmt.Sprintf("countSeries(%s)", strings.Join(queryPatts, ","))
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

// note: if series have different intervals, we could try to be clever and pick the one with highest resolution
// as it's more likely to be useful when combined with other functions, but that's too much hassle
Expand Down
3 changes: 1 addition & 2 deletions expr/func_derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/schema"
)

type FuncDerivative struct {
Expand Down Expand Up @@ -39,7 +38,7 @@ func (s *FuncDerivative) Exec(dataMap DataMap) ([]models.Series, error) {
serie.QueryPatt = fmt.Sprintf("derivative(%s)", serie.QueryPatt)
serie.Consolidator = consolidation.None
serie.QueryCons = consolidation.None
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

prev := math.NaN()
for _, p := range serie.Datapoints {
Expand Down
2 changes: 1 addition & 1 deletion expr/func_divideseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *FuncDivideSeries) Exec(dataMap DataMap) ([]models.Series, error) {
divisorsByRes := make(map[uint32]models.Series)
divisorsByRes[divisors[0].Interval] = divisors[0]
for _, dividend := range dividends {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
divisor := divisors[0]
if dividend.Interval != divisors[0].Interval {
lcm := util.Lcm([]uint32{dividend.Interval, divisor.Interval})
Expand Down
2 changes: 1 addition & 1 deletion expr/func_divideserieslists.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *FuncDivideSeriesLists) Exec(dataMap DataMap) ([]models.Series, error) {
for i := range dividends {
dividend, divisor := NormalizeTwo(dividends[i], divisors[i], NewCOWCycler(dataMap))

out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
for i := 0; i < len(dividend.Datapoints); i++ {
p := schema.Point{
Ts: dividend.Datapoints[i].Ts,
Expand Down
3 changes: 1 addition & 2 deletions expr/func_groupbynodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package expr

import (
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncGroupByNodes struct {
Expand Down Expand Up @@ -96,7 +95,7 @@ func (s *FuncGroupByNodes) Exec(dataMap DataMap) ([]models.Series, error) {
group.s = Normalize(group.s, NewCOWCycler(dataMap))
outSeries.Interval = group.s[0].Interval
outSeries.SetTags()
outSeries.Datapoints = pointSlicePool.Get().([]schema.Point)
outSeries.Datapoints = pointSlicePool.Get()
aggFunc(group.s, &outSeries.Datapoints)
output = append(output, outSeries)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/func_groupbynodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func benchmarkGroupByNodes(b *testing.B, numIn, numOut int) {
}

for _, serie := range results {
pointSlicePool.Put(serie.Datapoints[:0])
pointSlicePool.Put(serie.Datapoints)
}
}

Expand Down
3 changes: 1 addition & 2 deletions expr/func_groupbytags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/errors"
"github.com/grafana/metrictank/schema"
)

type FuncGroupByTags struct {
Expand Down Expand Up @@ -131,7 +130,7 @@ func (s *FuncGroupByTags) Exec(dataMap DataMap) ([]models.Series, error) {
}
newSeries.SetTags()

newSeries.Datapoints = pointSlicePool.Get().([]schema.Point)
newSeries.Datapoints = pointSlicePool.Get()
group.s = Normalize(group.s, NewCOWCycler(dataMap))
aggFunc(group.s, &newSeries.Datapoints)
dataMap.Add(Req{}, newSeries)
Expand Down
2 changes: 1 addition & 1 deletion expr/func_groupbytags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func benchmarkGroupByTags(b *testing.B, numInputSeries, numOutputSeries int) {

if true {
for _, serie := range results {
pointSlicePool.Put(serie.Datapoints[:0])
pointSlicePool.Put(serie.Datapoints)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions expr/func_integral.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/schema"
)

type FuncIntegral struct {
Expand Down Expand Up @@ -42,7 +41,7 @@ func (s *FuncIntegral) Exec(dataMap DataMap) ([]models.Series, error) {

current := 0.0

out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
for _, p := range serie.Datapoints {
if !math.IsNaN(p.Val) {
current += p.Val
Expand Down
2 changes: 1 addition & 1 deletion expr/func_invert.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *FuncInvert) Exec(dataMap DataMap) ([]models.Series, error) {
}

for i, serie := range series {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

for _, p := range serie.Datapoints {
out = append(out, schema.Point{Val: newVal(p.Val), Ts: p.Ts})
Expand Down
3 changes: 1 addition & 2 deletions expr/func_isnonnull.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncIsNonNull struct {
Expand Down Expand Up @@ -37,7 +36,7 @@ func (s *FuncIsNonNull) Exec(dataMap DataMap) ([]models.Series, error) {
serie.QueryPatt = fmt.Sprintf("isNonNull(%s)", serie.QueryPatt)
serie.Tags = serie.CopyTagsWith("isNonNull", "1")

out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
for _, p := range serie.Datapoints {
if math.IsNaN(p.Val) {
p.Val = 0
Expand Down
3 changes: 1 addition & 2 deletions expr/func_keeplastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncKeepLastValue struct {
Expand Down Expand Up @@ -50,7 +49,7 @@ func (s *FuncKeepLastValue) Exec(dataMap DataMap) ([]models.Series, error) {
for _, serie := range series {
serie.Target = fmt.Sprintf("keepLastValue(%s)", serie.Target)
serie.QueryPatt = serie.Target
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

var consecutiveNaNs int
lastVal := math.NaN()
Expand Down
2 changes: 1 addition & 1 deletion expr/func_minmax.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *FuncMinMax) Exec(dataMap DataMap) ([]models.Series, error) {
outputs := make([]models.Series, 0, len(series))

for _, serie := range series {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

minVal := valOrDefault(batch.Min(serie.Datapoints), 0)
maxVal := valOrDefault(batch.Max(serie.Datapoints), 0)
Expand Down
3 changes: 1 addition & 2 deletions expr/func_nonnegativederivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/schema"
)

type FuncNonNegativeDerivative struct {
Expand Down Expand Up @@ -40,7 +39,7 @@ func (s *FuncNonNegativeDerivative) Exec(dataMap DataMap) ([]models.Series, erro
outSeries := make([]models.Series, 0, len(series))

for _, serie := range series {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

prev := math.NaN()
for _, p := range serie.Datapoints {
Expand Down
Loading