Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #919 from grafana/timestamp-fixes-etc-wtf-circleci
Browse files Browse the repository at this point in the history
Timestamp fixes etc
  • Loading branch information
Dieterbe authored May 20, 2018
2 parents babc254 + 3823c2b commit 43927d2
Show file tree
Hide file tree
Showing 15 changed files with 313 additions and 168 deletions.
69 changes: 54 additions & 15 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,13 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {
} else if p.Ts > t {
// point is too recent, append a null and reconsider same point for next slot
out[o] = schema.Point{Val: math.NaN(), Ts: t}
} else if p.Ts > t-interval && p.Ts < t {
// point is a bit older, so it's good enough, just quantize the ts, and move on to next point for next round
} else if p.Ts > t-interval {
// point is older but not by more than 1 interval, so it's good enough, just quantize the ts, and move on to next point for next round
out[o] = schema.Point{Val: p.Val, Ts: t}
i++
} else if p.Ts <= t-interval {
// point is too old. advance until we find a point that is recent enough, and then go through the considerations again,
} else {
// point is too old (older by 1 interval or more).
// advance until we find a point that is recent enough, and then go through the considerations again,
// if those considerations are any of the above ones.
// if the last point would end up in this branch again, discard it as well.
for p.Ts <= t-interval && i < len(in)-1 {
Expand Down Expand Up @@ -197,6 +198,8 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
return out, nil
}

// getTargetsRemote issues the requests on other nodes
// it's nothing more than a thin network wrapper around getTargetsLocal of a peer.
func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) {
responses := make(chan getTargetsResp, len(remoteReqs))
rCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -386,6 +389,9 @@ func logLoad(typ string, key schema.AMKey, from, to uint32) {
}
}

// getSeriesFixed gets the series and makes sure the output is quantized
// (needed because the raw chunks don't contain quantized data)
// TODO: we can probably forego Fix if archive > 0
func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) {
select {
case <-ctx.Done():
Expand All @@ -394,6 +400,10 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
default:
}
rctx := newRequestContext(ctx, &req, consolidator)
// see newRequestContext for a detailed explanation of this.
if rctx.From == rctx.To {
return nil, nil
}
res, err := s.getSeries(rctx)
if err != nil {
return nil, err
Expand All @@ -408,6 +418,9 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
return Fix(res.Points, req.From, req.To, req.ArchInterval), nil
}

// getSeries returns points from mem (and store if needed), within the range from (inclusive) - to (exclusive)
// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval
// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series.
func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
res, err := s.getSeriesAggMetrics(ctx)
if err != nil {
Expand All @@ -431,6 +444,7 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {

// if oldest < to -> search until oldest, we already have the rest from mem
// if to < oldest -> no need to search until oldest, only search until to
// adds iters from both the cache and the store (if applicable)
until := util.Min(res.Oldest, ctx.To)
fromCache, err := s.getSeriesCachedStore(ctx, until)
if err != nil {
Expand All @@ -440,10 +454,8 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
return res, nil
}

// getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive)
// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval
// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series.
// all data will also be quantized.
// itersToPoints converts the iters to points if they are within the from/to range
// TODO: just work on the result directly
func (s *Server) itersToPoints(ctx *requestContext, iters []chunk.Iter) []schema.Point {
pre := time.Now()

Expand Down Expand Up @@ -481,7 +493,7 @@ func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error)
if ctx.Cons != consolidation.None {
return metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To)
} else {
return metric.Get(ctx.From, ctx.To), nil
return metric.Get(ctx.From, ctx.To)
}
}

Expand All @@ -500,7 +512,10 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
logLoad("cassan", ctx.AMKey, ctx.From, ctx.To)

log.Debug("cache: searching query key %s, from %d, until %d", ctx.AMKey, ctx.From, until)
cacheRes := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until)
cacheRes, err := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until)
if err != nil {
return iters, err
}
log.Debug("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End))

// check to see if the request has been canceled, if so abort now.
Expand Down Expand Up @@ -531,7 +546,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
default:
}

// the request cannot completely be served from cache, it will require cassandra involvement
// the request cannot completely be served from cache, it will require store involvement
if !cacheRes.Complete {
if cacheRes.From != cacheRes.Until {
storeIterGens, err := s.BackendStore.Search(ctx.ctx, ctx.AMKey, ctx.Req.TTL, cacheRes.From, cacheRes.Until)
Expand All @@ -551,11 +566,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
tracing.Failure(span)
tracing.Errorf(span, "itergen: error getting iter from cassandra slice %+v", err)
tracing.Errorf(span, "itergen: error getting iter from store slice %+v", err)
return iters, err
}
// it's important that the itgens get added in chronological order,
// currently we rely on cassandra returning results in order
// currently we rely on store returning results in order
s.Cache.Add(ctx.AMKey, prevts, itgen)
prevts = itgen.Ts
iters = append(iters, *it)
Expand Down Expand Up @@ -656,8 +671,32 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
// STORED DATA 0[----------60][---------120][---------180][---------240] but data for 60 may be at 1..60, data for 120 at 61..120 and for 180 at 121..180 (due to quantizing)
// to retrieve the stored data, we also use from inclusive and to exclusive,
// so to make sure that the data after quantization (Fix()) is correct, we have to make the following adjustment:
// `from` 1..60 needs data 1..60 -> always adjust `from` to previous boundary+1 (here 1)
// `to` 181..240 needs data 121..180 -> always adjust `to` to previous boundary+1 (here 181)
// `from` 1..60 needs data 1..60 -> to assure we can read that data we adjust `from` to previous boundary+1 (here 1). (will be quantized to next boundary in this case 60)
// `to` 181..240 needs data 121..180 -> to avoid reading needless data we adjust `to` to previous boundary+1 (here 181), last ts returned must be 180

// except... there's a special case. let's say archinterval=60, and user requests:
// to=25, until=36
// we know that in the graphite model there will be no data in that timeframe:
// maybe the user submitted a point with ts=30 but it will be quantized to ts=60 so it is out of range.
// but wouldn't it be nice to include it anyway?
// we can argue both ways, but the other thing is that if we apply the logic above, we end up with:
// from := 1
// to := 1
// which is a query not accepted by AggMetric, Ccache or store. (from must be less than to)
// the only way we can get acceptable queries is for from to be 1 and to to remain 36 (or become 60 or 61)
// such a fetch request would include the requested point
// but we know Fix() will later create the output according to these rules:
// * first point should have the first timestamp >= from that divides by interval (that would be 60 in this case)
// * last point should have the last timestamp < to that divides by interval (because to is always exclusive) (that would be 0 in this case)
// which wouldn't make sense of course. one could argue it should output one point with ts=60,
// but to do that, we have to "broaden" the `to` requested by the user, covering a larger time frame they didn't actually request.
// and we deviate from the quantizing model.
// I think we should just stick to the quantizing model

// we can do the logic above backwards: if from and to are adjusted to the same value, such as 181, it means `from` was 181..240 and `to` was 181..240
// which is either a nonsensical request (from > to, from == to) or from < to but such that the requested timeframe falls in between two adjacent quantized
// timestamps and could not include either of them.
// so the caller can just compare rc.From and rc.To and if equal, immediately return [] to the client.

if consolidator == consolidation.None {
rc.From = prevBoundary(req.From, req.ArchInterval) + 1
Expand Down
145 changes: 77 additions & 68 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,17 +447,24 @@ func generateChunks(span uint32, start uint32, end uint32) []chunk.Chunk {
var chunks []chunk.Chunk

c := chunk.New(start)
for i := start; i < end; i++ {
c.Push(i, float64((i-start)*2))
if (i+1)%span == 0 {
// Mark the chunk that just got finished as finished
for ts := start; ts < end; ts++ {
val := float64((ts - start) * 2)
c.Push(ts, val)
// handle the case of this being the last point for this chunk
if (ts+1)%span == 0 {
c.Finish()
chunks = append(chunks, *c)
if i < end {
c = chunk.New(i + 1)
// if there will be a next iteration, prepare the chunk
if ts+1 < end {
c = chunk.New(ts + 1)
}
}
}
// if end was not quantized we have to finish the last chunk
if !c.Closed {
c.Finish()
chunks = append(chunks, *c)
}
return chunks
}

Expand All @@ -479,10 +486,9 @@ func generateChunks(span uint32, start uint32, end uint32) []chunk.Chunk {
//
func TestGetSeriesCachedStore(t *testing.T) {
span := uint32(600)
// save some electrons by skipping steps that are no edge cases
steps := span / 10
start := span
// we want 10 chunks to serve the largest testcase
// they will have t0 600, 1200, ..., 5400, 6000
end := span * 11
chunks := generateChunks(span, start, end)

Expand All @@ -493,12 +499,12 @@ func TestGetSeriesCachedStore(t *testing.T) {
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0)
srv.BindMemoryStore(metrics)
metric := test.GetAMKey(1)
var c *cache.CCache
var itgen *chunk.IterGen
var prevts uint32

type testcase struct {
// the pattern of chunks in store, cache or both
// the pattern of chunks
// c: in cache
// s: in store
// b: in both
Pattern string

// expected number of cache hits on query over all chunks
Expand All @@ -521,24 +527,28 @@ func TestGetSeriesCachedStore(t *testing.T) {
for _, tc := range testcases {
pattern := tc.Pattern

// last ts is start ts plus the number of spans the pattern defines
// lastTs is the t0 of the first chunk that comes after the used range
lastTs := start + span*uint32(len(pattern))

// we want to query through various ranges, including:
// - from first ts to first ts
// - from first ts to last ts
// - from last ts to last ts
// and various ranges between
for from := start; from <= lastTs; from += steps {
for to := from; to <= lastTs; to += steps {
// reinstantiate the cache at the beginning of each run
c = cache.NewCCache()
// we increment from and to in tenths of a span,
// because incrementing by 1 would be needlessly expensive
step := span / 10
for from := start; from <= lastTs; from += step {
for to := from; to <= lastTs; to += step {
// use fresh store and cache
c := cache.NewCCache()
srv.BindCache(c)
store.Reset()

// populate cache and store according to pattern definition
prevts = 0
var prevts uint32
for i := 0; i < len(tc.Pattern); i++ {
itgen = chunk.NewBareIterGen(chunks[i].Series.Bytes(), chunks[i].Series.T0, span)
itgen := chunk.NewBareIterGen(chunks[i].Series.Bytes(), chunks[i].Series.T0, span)
if pattern[i] == 'c' || pattern[i] == 'b' {
c.Add(metric, prevts, *itgen)
}
Expand All @@ -554,6 +564,14 @@ func TestGetSeriesCachedStore(t *testing.T) {
req.ArchInterval = 1
ctx := newRequestContext(test.NewContext(), &req, consolidation.None)
iters, err := srv.getSeriesCachedStore(ctx, to)

// test invalid query; from must be less than to
if from == to {
if err == nil {
t.Fatalf("Pattern %s From=To %d: expected err, got nil", pattern, from)
}
continue
}
if err != nil {
t.Fatalf("Pattern %s From %d To %d: error %s", pattern, from, to, err)
}
Expand All @@ -568,7 +586,7 @@ func TestGetSeriesCachedStore(t *testing.T) {
// we use the tsTracker to increase together with the iterators and compare at each step
tsTracker := expectResFrom

tsSlice := make([]uint32, 0)
var tsSlice []uint32
for i, it := range iters {
for it.Next() {
ts, _ := it.Values()
Expand All @@ -580,74 +598,66 @@ func TestGetSeriesCachedStore(t *testing.T) {
}
}

if to-from > 0 {
if len(tsSlice) == 0 {
t.Fatalf("Pattern %s From %d To %d; Should have >0 results but got 0", pattern, from, to)
}
if tsSlice[0] != expectResFrom {
t.Fatalf("Pattern %s From %d To %d; Expected first to be %d but got %d", pattern, from, to, expectResFrom, tsSlice[0])
}
if tsSlice[len(tsSlice)-1] != expectResTo {
t.Fatalf("Pattern %s From %d To %d; Expected last to be %d but got %d", pattern, from, to, expectResTo, tsSlice[len(tsSlice)-1])
}
} else if len(tsSlice) > 0 {
t.Fatalf("Pattern %s From %d To %d; Expected results to have len 0 but got %d", pattern, from, to, len(tsSlice))
if len(tsSlice) == 0 {
t.Fatalf("Pattern %s From %d To %d; Should have >0 results but got 0", pattern, from, to)
}
if tsSlice[0] != expectResFrom {
t.Fatalf("Pattern %s From %d To %d; Expected first to be %d but got %d", pattern, from, to, expectResFrom, tsSlice[0])
}
if tsSlice[len(tsSlice)-1] != expectResTo {
t.Fatalf("Pattern %s From %d To %d; Expected last to be %d but got %d", pattern, from, to, expectResTo, tsSlice[len(tsSlice)-1])
}

expectedHits := uint32(0)
complete := false
// because ranges are exclusive at the end we'll test for to - 1
exclTo := to - 1

// if from is equal to we always expect 0 hits
if from != to {
// seek hits from beginning of the searched ranged within the given pattern
for i := 0; i < len(pattern); i++ {

// seek hits from beginning of the searched ranged within the given pattern
for i := 0; i < len(pattern); i++ {
// if pattern index is lower than from's chunk we continue
if from-(from%span) > start+uint32(i)*span {
continue
}

// current pattern index is a cache hit, so we expect one more
if pattern[i] == 'c' || pattern[i] == 'b' {
expectedHits++
} else {
break
}

// if pattern index is lower than from's chunk we continue
if from-(from%span) > start+uint32(i)*span {
// if we've already seeked beyond to's pattern we break and mark the seek as complete
if exclTo-(exclTo%span) == start+uint32(i)*span {
complete = true
break
}
}

// only if the previous seek was not complete we launch one from the other end
if !complete {

// now the same from the other end (just like the cache searching does)
for i := len(pattern) - 1; i >= 0; i-- {

// if pattern index is above to's chunk we continue
if exclTo-(exclTo%span)+span <= start+uint32(i)*span {
continue
}

// current pattern index is a cache hit, so we expect one more
// current pattern index is a cache hit, so we expecte one more
if pattern[i] == 'c' || pattern[i] == 'b' {
expectedHits++
} else {
break
}

// if we've already seeked beyond to's pattern we break and mark the seek as complete
if exclTo-(exclTo%span) == start+uint32(i)*span {
complete = true
// if we've already seeked beyond from's pattern we break
if from-(from%span) == start+uint32(i)*span {
break
}
}

// only if the previous seek was not complete we launch one from the other end
if !complete {

// now the same from the other end (just like the cache searching does)
for i := len(pattern) - 1; i >= 0; i-- {

// if pattern index is above to's chunk we continue
if exclTo-(exclTo%span)+span <= start+uint32(i)*span {
continue
}

// current pattern index is a cache hit, so we expecte one more
if pattern[i] == 'c' || pattern[i] == 'b' {
expectedHits++
} else {
break
}

// if we've already seeked beyond from's pattern we break
if from-(from%span) == start+uint32(i)*span {
break
}
}
}
}

// verify we got all cache hits we should have
Expand All @@ -659,7 +669,6 @@ func TestGetSeriesCachedStore(t *testing.T) {

// stop cache go routines before reinstantiating it at the top of the loop
c.Stop()
store.Reset()
}
}
}
Expand Down
Loading

0 comments on commit 43927d2

Please sign in to comment.