Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
make 1 single pointslicepool type that has a GetMin() method
Browse files Browse the repository at this point in the history
while we're at it, add some syntactic sugar
  • Loading branch information
Dieterbe committed Jul 9, 2020
1 parent 433ab39 commit 206cced
Show file tree
Hide file tree
Showing 35 changed files with 102 additions and 102 deletions.
20 changes: 7 additions & 13 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,10 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {
// the requested range is too narrow for the requested interval
return []schema.Point{}
}
// try to get a sufficiently sized slice from the pool. if it fails, allocate a new one.
var out []schema.Point

neededCap := int((last-first)/interval + 1)
candidate := pointSlicePool.Get().([]schema.Point)
if cap(candidate) >= neededCap {
out = candidate[:neededCap]
} else {
pointSlicePool.Put(candidate)
out = make([]schema.Point, neededCap)
}
out := pointSlicePool.GetMin(neededCap)
out = out[:neededCap]

// i iterates in. o iterates out. t is the ts we're looking to fill.
for t, i, o := first, 0, -1; t <= last; t += interval {
Expand Down Expand Up @@ -120,7 +114,7 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {
o -= 1
}
}
pointSlicePool.Put(in[:0])
pointSlicePool.Put(in)

return out
}
Expand All @@ -146,7 +140,7 @@ func divide(pointsA, pointsB []schema.Point) []schema.Point {
for i := range pointsA {
pointsA[i].Val /= pointsB[i].Val
}
pointSlicePool.Put(pointsB[:0])
pointSlicePool.Put(pointsB)
return pointsA
}

Expand Down Expand Up @@ -480,7 +474,7 @@ func (s *Server) getSeries(ctx *requestContext, ss *models.StorageStats) (mdata.
func (s *Server) itersToPoints(ctx *requestContext, iters []tsz.Iter) []schema.Point {
pre := time.Now()

points := pointSlicePool.Get().([]schema.Point)
points := pointSlicePool.Get()
for _, iter := range iters {
total := 0
good := 0
Expand Down Expand Up @@ -666,7 +660,7 @@ func mergeSeries(in []models.Series, dataMap expr.DataMap) []models.Series {
}
}
for j := 1; j < len(series); j++ {
pointSlicePool.Put(series[j].Datapoints[:0])
pointSlicePool.Put(series[j].Datapoints)
}
merged[i] = series[0]
for j := 1; j < len(series); j++ {
Expand Down
20 changes: 4 additions & 16 deletions api/init.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,13 @@
package api

import (
"sync"

"github.com/grafana/metrictank/expr"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/pointslicepool"
)

// default size is probably bigger than what most responses need, but it saves [re]allocations
// also it's possible that occasionnally more size is needed, causing a realloc of underlying array, and that extra space will stick around until next GC run.
const defaultPointSliceSize = 2000

var pointSlicePool sync.Pool

func pointSlicePoolAllocNew() interface{} {
return make([]schema.Point, 0, defaultPointSliceSize)
}
var pointSlicePool *pointslicepool.PointSlicePool

func init() {
pointSlicePool = sync.Pool{
New: pointSlicePoolAllocNew,
}
expr.Pool(&pointSlicePool)
pointSlicePool = pointslicepool.New(pointslicepool.DefaultPointSliceSize)
expr.Pool(pointSlicePool)
}
2 changes: 1 addition & 1 deletion expr/datamap.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (dm DataMap) Add(r Req, s ...models.Series) {
func (dm DataMap) Clean() {
for _, series := range dm {
for _, serie := range series {
pointSlicePool.Put(serie.Datapoints[:0])
pointSlicePool.Put(serie.Datapoints)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions expr/func_absolute.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncAbsolute struct {
Expand Down Expand Up @@ -36,7 +35,7 @@ func (s *FuncAbsolute) Exec(dataMap DataMap) ([]models.Series, error) {
serie.Target = fmt.Sprintf("absolute(%s)", serie.Target)
serie.Tags = serie.CopyTagsWith("absolute", "1")
serie.QueryPatt = fmt.Sprintf("absolute(%s)", serie.QueryPatt)
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
for _, p := range serie.Datapoints {
p.Val = math.Abs(p.Val)
out = append(out, p)
Expand Down
3 changes: 1 addition & 2 deletions expr/func_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"unsafe"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncAggregate struct {
Expand Down Expand Up @@ -69,7 +68,7 @@ func aggregate(dataMap DataMap, series []models.Series, queryPatts []string, agg
series[0].QueryPatt = name
return series, nil
}
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

agg.function(series, &out)

Expand Down
12 changes: 6 additions & 6 deletions expr/func_aspercent.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *FuncAsPercent) execWithNodes(in, totals []models.Series, dataMap DataMa
nonesSerie.Tags = map[string]string{"name": nonesSerie.Target}

if nones == nil {
nones = pointSlicePool.Get().([]schema.Point)
nones = pointSlicePool.Get()
for _, p := range totalSerieByKey[key].Datapoints {
p.Val = math.NaN()
nones = append(nones, p)
Expand All @@ -144,7 +144,7 @@ func (s *FuncAsPercent) execWithNodes(in, totals []models.Series, dataMap DataMa
nonesSerie.Meta = serie1.Meta.Copy()

if nones == nil {
nones = pointSlicePool.Get().([]schema.Point)
nones = pointSlicePool.Get()
for _, p := range serie1.Datapoints {
p.Val = math.NaN()
nones = append(nones, p)
Expand All @@ -157,7 +157,7 @@ func (s *FuncAsPercent) execWithNodes(in, totals []models.Series, dataMap DataMa
} else {
// key found in both inByKey and totalSerieByKey
serie1, serie2 := NormalizeTwo(dataMap, serie1, totalSerieByKey[key])
serie1 = serie1.Copy(pointSlicePool.Get().([]schema.Point))
serie1 = serie1.Copy(pointSlicePool.Get())
serie1.QueryPatt = fmt.Sprintf("asPercent(%s,%s)", serie1.QueryPatt, serie2.QueryPatt)
serie1.Target = fmt.Sprintf("asPercent(%s,%s)", serie1.Target, serie2.Target)
serie1.Tags = map[string]string{"name": serie1.Target}
Expand Down Expand Up @@ -216,12 +216,12 @@ func (s *FuncAsPercent) execWithoutNodes(in, totals []models.Series, dataMap Dat
}
if len(totalsSerie.Datapoints) > 0 {
serie, totalsSerie = NormalizeTwo(dataMap, serie, totalsSerie)
serie = serie.Copy(pointSlicePool.Get().([]schema.Point))
serie = serie.Copy(pointSlicePool.Get())
for i := range serie.Datapoints {
serie.Datapoints[i].Val = computeAsPercent(serie.Datapoints[i].Val, totalsSerie.Datapoints[i].Val)
}
} else {
serie = serie.Copy(pointSlicePool.Get().([]schema.Point))
serie = serie.Copy(pointSlicePool.Get())
for i := range serie.Datapoints {
serie.Datapoints[i].Val = computeAsPercent(serie.Datapoints[i].Val, s.totalFloat)
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func sumSeries(in []models.Series, dataMap DataMap) models.Series {
if len(in) == 1 {
return in[0]
}
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
crossSeriesSum(in, &out)
var queryPatts []string
var meta models.SeriesMeta
Expand Down
2 changes: 1 addition & 1 deletion expr/func_constantline.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *FuncConstantLine) Context(context Context) Context {
}

func (s *FuncConstantLine) Exec(dataMap DataMap) ([]models.Series, error) {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

out = append(out, schema.Point{Val: s.value, Ts: s.first})
diff := s.last - s.first
Expand Down
3 changes: 1 addition & 2 deletions expr/func_countseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"strings"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncCountSeries struct {
Expand Down Expand Up @@ -38,7 +37,7 @@ func (s *FuncCountSeries) Exec(dataMap DataMap) ([]models.Series, error) {

cons, queryCons := summarizeCons(series)
name := fmt.Sprintf("countSeries(%s)", strings.Join(queryPatts, ","))
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

// note: if series have different intervals, we could try to be clever and pick the one with highest resolution
// as it's more likely to be useful when combined with other functions, but that's too much hassle
Expand Down
3 changes: 1 addition & 2 deletions expr/func_derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/schema"
)

type FuncDerivative struct {
Expand Down Expand Up @@ -39,7 +38,7 @@ func (s *FuncDerivative) Exec(dataMap DataMap) ([]models.Series, error) {
serie.QueryPatt = fmt.Sprintf("derivative(%s)", serie.QueryPatt)
serie.Consolidator = consolidation.None
serie.QueryCons = consolidation.None
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

prev := math.NaN()
for _, p := range serie.Datapoints {
Expand Down
2 changes: 1 addition & 1 deletion expr/func_divideseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *FuncDivideSeries) Exec(dataMap DataMap) ([]models.Series, error) {
divisorsByRes := make(map[uint32]models.Series)
divisorsByRes[divisors[0].Interval] = divisors[0]
for _, dividend := range dividends {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
divisor := divisors[0]
if dividend.Interval != divisors[0].Interval {
lcm := util.Lcm([]uint32{dividend.Interval, divisor.Interval})
Expand Down
2 changes: 1 addition & 1 deletion expr/func_divideserieslists.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *FuncDivideSeriesLists) Exec(dataMap DataMap) ([]models.Series, error) {
for i := range dividends {
dividend, divisor := NormalizeTwo(dataMap, dividends[i], divisors[i])

out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
for i := 0; i < len(dividend.Datapoints); i++ {
p := schema.Point{
Ts: dividend.Datapoints[i].Ts,
Expand Down
3 changes: 1 addition & 2 deletions expr/func_groupbynodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package expr

import (
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncGroupByNodes struct {
Expand Down Expand Up @@ -96,7 +95,7 @@ func (s *FuncGroupByNodes) Exec(dataMap DataMap) ([]models.Series, error) {
group.s = Normalize(dataMap, group.s)
outSeries.Interval = group.s[0].Interval
outSeries.SetTags()
outSeries.Datapoints = pointSlicePool.Get().([]schema.Point)
outSeries.Datapoints = pointSlicePool.Get()
aggFunc(group.s, &outSeries.Datapoints)
output = append(output, outSeries)
}
Expand Down
2 changes: 1 addition & 1 deletion expr/func_groupbynodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func benchmarkGroupByNodes(b *testing.B, numIn, numOut int) {
}

for _, serie := range results {
pointSlicePool.Put(serie.Datapoints[:0])
pointSlicePool.Put(serie.Datapoints)
}
}

Expand Down
3 changes: 1 addition & 2 deletions expr/func_groupbytags.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/errors"
"github.com/grafana/metrictank/schema"
)

type FuncGroupByTags struct {
Expand Down Expand Up @@ -131,7 +130,7 @@ func (s *FuncGroupByTags) Exec(dataMap DataMap) ([]models.Series, error) {
}
newSeries.SetTags()

newSeries.Datapoints = pointSlicePool.Get().([]schema.Point)
newSeries.Datapoints = pointSlicePool.Get()
group.s = Normalize(dataMap, group.s)
aggFunc(group.s, &newSeries.Datapoints)
dataMap.Add(Req{}, newSeries)
Expand Down
2 changes: 1 addition & 1 deletion expr/func_groupbytags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func benchmarkGroupByTags(b *testing.B, numInputSeries, numOutputSeries int) {

if true {
for _, serie := range results {
pointSlicePool.Put(serie.Datapoints[:0])
pointSlicePool.Put(serie.Datapoints)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions expr/func_integral.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/schema"
)

type FuncIntegral struct {
Expand Down Expand Up @@ -42,7 +41,7 @@ func (s *FuncIntegral) Exec(dataMap DataMap) ([]models.Series, error) {

current := 0.0

out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
for _, p := range serie.Datapoints {
if !math.IsNaN(p.Val) {
current += p.Val
Expand Down
2 changes: 1 addition & 1 deletion expr/func_invert.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *FuncInvert) Exec(dataMap DataMap) ([]models.Series, error) {
}

for i, serie := range series {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

for _, p := range serie.Datapoints {
out = append(out, schema.Point{Val: newVal(p.Val), Ts: p.Ts})
Expand Down
3 changes: 1 addition & 2 deletions expr/func_isnonnull.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncIsNonNull struct {
Expand Down Expand Up @@ -37,7 +36,7 @@ func (s *FuncIsNonNull) Exec(dataMap DataMap) ([]models.Series, error) {
serie.QueryPatt = fmt.Sprintf("isNonNull(%s)", serie.QueryPatt)
serie.Tags = serie.CopyTagsWith("isNonNull", "1")

out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
for _, p := range serie.Datapoints {
if math.IsNaN(p.Val) {
p.Val = 0
Expand Down
3 changes: 1 addition & 2 deletions expr/func_keeplastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"math"

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/schema"
)

type FuncKeepLastValue struct {
Expand Down Expand Up @@ -50,7 +49,7 @@ func (s *FuncKeepLastValue) Exec(dataMap DataMap) ([]models.Series, error) {
for _, serie := range series {
serie.Target = fmt.Sprintf("keepLastValue(%s)", serie.Target)
serie.QueryPatt = serie.Target
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

var consecutiveNaNs int
lastVal := math.NaN()
Expand Down
2 changes: 1 addition & 1 deletion expr/func_minmax.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *FuncMinMax) Exec(dataMap DataMap) ([]models.Series, error) {
outputs := make([]models.Series, 0, len(series))

for _, serie := range series {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

minVal := valOrDefault(batch.Min(serie.Datapoints), 0)
maxVal := valOrDefault(batch.Max(serie.Datapoints), 0)
Expand Down
3 changes: 1 addition & 2 deletions expr/func_nonnegativederivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/schema"
)

type FuncNonNegativeDerivative struct {
Expand Down Expand Up @@ -40,7 +39,7 @@ func (s *FuncNonNegativeDerivative) Exec(dataMap DataMap) ([]models.Series, erro
outSeries := make([]models.Series, 0, len(series))

for _, serie := range series {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()

prev := math.NaN()
for _, p := range serie.Datapoints {
Expand Down
2 changes: 1 addition & 1 deletion expr/func_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *FuncOffset) Exec(dataMap DataMap) ([]models.Series, error) {

outSeries := make([]models.Series, 0, len(series))
for _, serie := range series {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
for _, v := range serie.Datapoints {
out = append(out, schema.Point{Val: v.Val + s.factor, Ts: v.Ts})
}
Expand Down
2 changes: 1 addition & 1 deletion expr/func_persecond.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *FuncPerSecond) Exec(dataMap DataMap) ([]models.Series, error) {

outSeries := make([]models.Series, 0, len(series))
for _, serie := range series {
out := pointSlicePool.Get().([]schema.Point)
out := pointSlicePool.Get()
for i, v := range serie.Datapoints {
out = append(out, schema.Point{Ts: v.Ts})
if i == 0 || math.IsNaN(v.Val) || math.IsNaN(serie.Datapoints[i-1].Val) {
Expand Down
Loading

0 comments on commit 206cced

Please sign in to comment.