Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1923 from grafana/psp-stats-with-nudge-fix
Browse files Browse the repository at this point in the history
ConsolidateNudge: nudge without reducing capacity of underlying slice, to keep pointslicepool effective
  • Loading branch information
Dieterbe authored Oct 15, 2020
2 parents be20755 + e45c4ac commit 2edab76
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
35 changes: 19 additions & 16 deletions consolidation/consolidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func ConsolidateContext(ctx context.Context, in []schema.Point, aggNum uint32, c
return nil
default:
}
return Consolidate(in, aggNum, consolidator)
return Consolidate(in, 0, aggNum, consolidator)
}

// ConsolidateNudged consolidates points in a "mostly-stable" way, meaning if you run the same function again so that the input
Expand All @@ -23,32 +23,35 @@ func ConsolidateContext(ctx context.Context, in []schema.Point, aggNum uint32, c
// interval is the interval between the input points
func ConsolidateNudged(points []schema.Point, interval, maxDataPoints uint32, consolidator Consolidator) ([]schema.Point, uint32) {
aggNum := AggEvery(uint32(len(points)), maxDataPoints)
points = nudgeMaybe(points, aggNum, interval)
points = Consolidate(points, aggNum, consolidator)
skip := nudgeMaybe(points, aggNum, interval)
points = Consolidate(points, skip, aggNum, consolidator)
return points, interval * aggNum
}

// Consolidate consolidates `in`, aggNum points at a time via the given function
// note: the returned slice repurposes in's backing array.
// it will always aggregate aggNum-sized groups of points together, with the timestamp of the last of them, and it always starts at the beginning,
// possibly having a point at the end that didn't incorporate as much data
func Consolidate(in []schema.Point, aggNum uint32, consolidator Consolidator) []schema.Point {
// the first "skip" points will be ignored. It is the callers responsibility to make sure skip is not more than the length of the input
func Consolidate(in []schema.Point, skip int, aggNum uint32, consolidator Consolidator) []schema.Point {
if consolidator == None {
panic("Consolidate called with consolidation.None. this should never happen")
}
num := int(aggNum)
aggFunc := GetAggFunc(consolidator)

skippedIn := in[skip:]

// let's see if the input data is a perfect fit for the requested aggNum
// (e.g. no remainder). This case is the easiest to handle
outLen := len(in) / num
outLen := len(skippedIn) / num
cleanLen := num * outLen
if len(in) == cleanLen {
if len(skippedIn) == cleanLen {
out := in[0:outLen]
var outI, nextI int
for inI := 0; inI < cleanLen; inI = nextI {
nextI = inI + num
out[outI] = schema.Point{Val: aggFunc(in[inI:nextI]), Ts: in[nextI-1].Ts}
out[outI] = schema.Point{Val: aggFunc(skippedIn[inI:nextI]), Ts: skippedIn[nextI-1].Ts}
outI += 1
}
return out
Expand All @@ -60,7 +63,7 @@ func Consolidate(in []schema.Point, aggNum uint32, consolidator Consolidator) []
var outI, nextI int
for inI := 0; inI < cleanLen; inI = nextI {
nextI = inI + num
out[outI] = schema.Point{Val: aggFunc(in[inI:nextI]), Ts: in[nextI-1].Ts}
out[outI] = schema.Point{Val: aggFunc(skippedIn[inI:nextI]), Ts: skippedIn[nextI-1].Ts}
outI += 1
}

Expand All @@ -69,14 +72,14 @@ func Consolidate(in []schema.Point, aggNum uint32, consolidator Consolidator) []
// if the group would have been complete, i.e. points in the consolidation output should be evenly spaced.
// obviously we can only figure out the interval if we have at least 2 points
var lastTs uint32
if len(in) == 1 {
lastTs = in[0].Ts
if len(skippedIn) == 1 {
lastTs = skippedIn[0].Ts
} else {
interval := in[len(in)-1].Ts - in[len(in)-2].Ts
interval := skippedIn[len(skippedIn)-1].Ts - skippedIn[len(skippedIn)-2].Ts
// len 10, cleanLen 9, num 3 -> 3*4 values supposedly -> "in[11].Ts" -> in[9].Ts + 2*interval
lastTs = in[cleanLen].Ts + (aggNum-1)*interval
lastTs = skippedIn[cleanLen].Ts + (aggNum-1)*interval
}
out[outI] = schema.Point{Val: aggFunc(in[cleanLen:]), Ts: lastTs}
out[outI] = schema.Point{Val: aggFunc(skippedIn[cleanLen:]), Ts: lastTs}
return out
}

Expand All @@ -89,7 +92,7 @@ func AggEvery(numPoints, maxPoints uint32) uint32 {
return (numPoints + maxPoints - 1) / maxPoints
}

func nudgeMaybe(points []schema.Point, aggNum, interval uint32) []schema.Point {
func nudgeMaybe(points []schema.Point, aggNum, interval uint32) int {
// note that the amount of points to strip by nudging is always < 1 postAggInterval's worth.
// there's 2 important considerations here:
// 1) we shouldn't make any too drastic alterations of the timerange returned compared to the requested time range
Expand All @@ -112,9 +115,9 @@ func nudgeMaybe(points []schema.Point, aggNum, interval uint32) []schema.Point {
// see the unit tests which explore cases like this (TestConsolidateNudgedNoTrimDueToNotManyPoints)
if len(points) > int(2*aggNum) {
_, num := nudge(points[0].Ts, interval, aggNum)
points = points[num:]
return num
}
return points
return 0
}

// Nudge computes the parameters for nudging:
Expand Down
4 changes: 2 additions & 2 deletions consolidation/consolidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type testCase struct {

func validate(cases []testCase, t *testing.T) {
for i, c := range cases {
out := Consolidate(c.in, c.num, c.consol)
out := Consolidate(c.in, 0, c.num, c.consol)
if len(out) != len(c.out) {
t.Fatalf("output for testcase %d mismatch: expected: %v, got: %v", i, c.out, out)

Expand Down Expand Up @@ -502,7 +502,7 @@ func benchmarkConsolidate(fn func() []schema.Point, aggNum uint32, consolidator
in := fn()
l = len(in)
b.StartTimer()
ret := Consolidate(in, aggNum, consolidator)
ret := Consolidate(in, 0, aggNum, consolidator)
dummy = ret
}
b.SetBytes(int64(l * 12))
Expand Down
2 changes: 1 addition & 1 deletion expr/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NormalizeTo(in models.Series, interval uint32, sc seriescycle.SeriesCycler)
}

sc.Done(in)
in.Datapoints = consolidation.Consolidate(datapoints, interval/in.Interval, in.Consolidator)
in.Datapoints = consolidation.Consolidate(datapoints, 0, interval/in.Interval, in.Consolidator)
in.Interval = interval
sc.New(in)

Expand Down

0 comments on commit 2edab76

Please sign in to comment.