Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
consolidateNudged: nudge without reducing capacity of underlying slice
Browse files Browse the repository at this point in the history
without this fix, you would commonly see that slices going into the
pointslicepool would have a cap that is one or two points less then
what you need on subsequent reads. This fix increases efficacy of
the pool.
  • Loading branch information
Dieterbe committed Oct 14, 2020
1 parent f4d52da commit b23b039
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
35 changes: 19 additions & 16 deletions consolidation/consolidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func ConsolidateContext(ctx context.Context, in []schema.Point, aggNum uint32, c
return nil
default:
}
return Consolidate(in, aggNum, consolidator)
return Consolidate(in, 0, aggNum, consolidator)
}

// ConsolidateNudged consolidates points in a "mostly-stable" way, meaning if you run the same function again so that the input
Expand All @@ -23,32 +23,35 @@ func ConsolidateContext(ctx context.Context, in []schema.Point, aggNum uint32, c
// interval is the interval between the input points
func ConsolidateNudged(points []schema.Point, interval, maxDataPoints uint32, consolidator Consolidator) ([]schema.Point, uint32) {
aggNum := AggEvery(uint32(len(points)), maxDataPoints)
points = nudgeMaybe(points, aggNum, interval)
points = Consolidate(points, aggNum, consolidator)
skip := nudgeMaybe(points, aggNum, interval)
points = Consolidate(points, skip, aggNum, consolidator)
return points, interval * aggNum
}

// Consolidate consolidates `in`, aggNum points at a time via the given function
// note: the returned slice repurposes in's backing array.
// it will always aggregate aggNum-sized groups of points together, with the timestamp of the last of them, and it always starts at the beginning,
// possibly having a point at the end that didn't incorporate as much data
func Consolidate(in []schema.Point, aggNum uint32, consolidator Consolidator) []schema.Point {
// the first "skip" points will be ignored. It is the callers responsibility to make sure skip is not more than the length of the input
func Consolidate(in []schema.Point, skip int, aggNum uint32, consolidator Consolidator) []schema.Point {
if consolidator == None {
panic("Consolidate called with consolidation.None. this should never happen")
}
num := int(aggNum)
aggFunc := GetAggFunc(consolidator)

skippedIn := in[skip:]

// let's see if the input data is a perfect fit for the requested aggNum
// (e.g. no remainder). This case is the easiest to handle
outLen := len(in) / num
outLen := len(skippedIn) / num
cleanLen := num * outLen
if len(in) == cleanLen {
if len(skippedIn) == cleanLen {
out := in[0:outLen]
var outI, nextI int
for inI := 0; inI < cleanLen; inI = nextI {
nextI = inI + num
out[outI] = schema.Point{Val: aggFunc(in[inI:nextI]), Ts: in[nextI-1].Ts}
out[outI] = schema.Point{Val: aggFunc(skippedIn[inI:nextI]), Ts: skippedIn[nextI-1].Ts}
outI += 1
}
return out
Expand All @@ -60,7 +63,7 @@ func Consolidate(in []schema.Point, aggNum uint32, consolidator Consolidator) []
var outI, nextI int
for inI := 0; inI < cleanLen; inI = nextI {
nextI = inI + num
out[outI] = schema.Point{Val: aggFunc(in[inI:nextI]), Ts: in[nextI-1].Ts}
out[outI] = schema.Point{Val: aggFunc(skippedIn[inI:nextI]), Ts: skippedIn[nextI-1].Ts}
outI += 1
}

Expand All @@ -69,14 +72,14 @@ func Consolidate(in []schema.Point, aggNum uint32, consolidator Consolidator) []
// if the group would have been complete, i.e. points in the consolidation output should be evenly spaced.
// obviously we can only figure out the interval if we have at least 2 points
var lastTs uint32
if len(in) == 1 {
lastTs = in[0].Ts
if len(skippedIn) == 1 {
lastTs = skippedIn[0].Ts
} else {
interval := in[len(in)-1].Ts - in[len(in)-2].Ts
interval := skippedIn[len(skippedIn)-1].Ts - skippedIn[len(skippedIn)-2].Ts
// len 10, cleanLen 9, num 3 -> 3*4 values supposedly -> "in[11].Ts" -> in[9].Ts + 2*interval
lastTs = in[cleanLen].Ts + (aggNum-1)*interval
lastTs = skippedIn[cleanLen].Ts + (aggNum-1)*interval
}
out[outI] = schema.Point{Val: aggFunc(in[cleanLen:]), Ts: lastTs}
out[outI] = schema.Point{Val: aggFunc(skippedIn[cleanLen:]), Ts: lastTs}
return out
}

Expand All @@ -89,7 +92,7 @@ func AggEvery(numPoints, maxPoints uint32) uint32 {
return (numPoints + maxPoints - 1) / maxPoints
}

func nudgeMaybe(points []schema.Point, aggNum, interval uint32) []schema.Point {
func nudgeMaybe(points []schema.Point, aggNum, interval uint32) int {
// note that the amount of points to strip by nudging is always < 1 postAggInterval's worth.
// there's 2 important considerations here:
// 1) we shouldn't make any too drastic alterations of the timerange returned compared to the requested time range
Expand All @@ -112,9 +115,9 @@ func nudgeMaybe(points []schema.Point, aggNum, interval uint32) []schema.Point {
// see the unit tests which explore cases like this (TestConsolidateNudgedNoTrimDueToNotManyPoints)
if len(points) > int(2*aggNum) {
_, num := nudge(points[0].Ts, interval, aggNum)
points = points[num:]
return num
}
return points
return 0
}

// Nudge computes the parameters for nudging:
Expand Down
4 changes: 2 additions & 2 deletions consolidation/consolidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type testCase struct {

func validate(cases []testCase, t *testing.T) {
for i, c := range cases {
out := Consolidate(c.in, c.num, c.consol)
out := Consolidate(c.in, 0, c.num, c.consol)
if len(out) != len(c.out) {
t.Fatalf("output for testcase %d mismatch: expected: %v, got: %v", i, c.out, out)

Expand Down Expand Up @@ -502,7 +502,7 @@ func benchmarkConsolidate(fn func() []schema.Point, aggNum uint32, consolidator
in := fn()
l = len(in)
b.StartTimer()
ret := Consolidate(in, aggNum, consolidator)
ret := Consolidate(in, 0, aggNum, consolidator)
dummy = ret
}
b.SetBytes(int64(l * 12))
Expand Down
2 changes: 1 addition & 1 deletion expr/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NormalizeTo(in models.Series, interval uint32, sc seriescycle.SeriesCycler)
}

sc.Done(in)
in.Datapoints = consolidation.Consolidate(datapoints, interval/in.Interval, in.Consolidator)
in.Datapoints = consolidation.Consolidate(datapoints, 0, interval/in.Interval, in.Consolidator)
in.Interval = interval
sc.New(in)

Expand Down

0 comments on commit b23b039

Please sign in to comment.