Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Timestamp fixes etc #919

Merged
merged 10 commits into from
May 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 54 additions & 15 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,13 @@ func Fix(in []schema.Point, from, to, interval uint32) []schema.Point {
} else if p.Ts > t {
// point is too recent, append a null and reconsider same point for next slot
out[o] = schema.Point{Val: math.NaN(), Ts: t}
} else if p.Ts > t-interval && p.Ts < t {
// point is a bit older, so it's good enough, just quantize the ts, and move on to next point for next round
} else if p.Ts > t-interval {
// point is older but not by more than 1 interval, so it's good enough, just quantize the ts, and move on to next point for next round
out[o] = schema.Point{Val: p.Val, Ts: t}
i++
} else if p.Ts <= t-interval {
// point is too old. advance until we find a point that is recent enough, and then go through the considerations again,
} else {
// point is too old (older by 1 interval or more).
// advance until we find a point that is recent enough, and then go through the considerations again,
// if those considerations are any of the above ones.
// if the last point would end up in this branch again, discard it as well.
for p.Ts <= t-interval && i < len(in)-1 {
Expand Down Expand Up @@ -197,6 +198,8 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
return out, nil
}

// getTargetsRemote issues the requests on other nodes
// it's nothing more than a thin network wrapper around getTargetsLocal of a peer.
func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) {
responses := make(chan getTargetsResp, len(remoteReqs))
rCtx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -386,6 +389,9 @@ func logLoad(typ string, key schema.AMKey, from, to uint32) {
}
}

// getSeriesFixed gets the series and makes sure the output is quantized
// (needed because the raw chunks don't contain quantized data)
// TODO: we can probably forego Fix if archive > 0
func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) {
select {
case <-ctx.Done():
Expand All @@ -394,6 +400,10 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
default:
}
rctx := newRequestContext(ctx, &req, consolidator)
// see newRequestContext for a detailed explanation of this.
if rctx.From == rctx.To {
return nil, nil
}
res, err := s.getSeries(rctx)
if err != nil {
return nil, err
Expand All @@ -408,6 +418,9 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
return Fix(res.Points, req.From, req.To, req.ArchInterval), nil
}

// getSeries returns points from mem (and store if needed), within the range from (inclusive) - to (exclusive)
// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval
// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series.
func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
res, err := s.getSeriesAggMetrics(ctx)
if err != nil {
Expand All @@ -431,6 +444,7 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {

// if oldest < to -> search until oldest, we already have the rest from mem
// if to < oldest -> no need to search until oldest, only search until to
// adds iters from both the cache and the store (if applicable)
until := util.Min(res.Oldest, ctx.To)
fromCache, err := s.getSeriesCachedStore(ctx, until)
if err != nil {
Expand All @@ -440,10 +454,8 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
return res, nil
}

// getSeries returns points from mem (and cassandra if needed), within the range from (inclusive) - to (exclusive)
// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval
// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series.
// all data will also be quantized.
// itersToPoints converts the iters to points if they are within the from/to range
// TODO: just work on the result directly
func (s *Server) itersToPoints(ctx *requestContext, iters []chunk.Iter) []schema.Point {
pre := time.Now()

Expand Down Expand Up @@ -481,7 +493,7 @@ func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error)
if ctx.Cons != consolidation.None {
return metric.GetAggregated(ctx.Cons, ctx.Req.ArchInterval, ctx.From, ctx.To)
} else {
return metric.Get(ctx.From, ctx.To), nil
return metric.Get(ctx.From, ctx.To)
}
}

Expand All @@ -500,7 +512,10 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
logLoad("cassan", ctx.AMKey, ctx.From, ctx.To)

log.Debug("cache: searching query key %s, from %d, until %d", ctx.AMKey, ctx.From, until)
cacheRes := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until)
cacheRes, err := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until)
if err != nil {
return iters, err
}
log.Debug("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End))

// check to see if the request has been canceled, if so abort now.
Expand Down Expand Up @@ -531,7 +546,7 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
default:
}

// the request cannot completely be served from cache, it will require cassandra involvement
// the request cannot completely be served from cache, it will require store involvement
if !cacheRes.Complete {
if cacheRes.From != cacheRes.Until {
storeIterGens, err := s.BackendStore.Search(ctx.ctx, ctx.AMKey, ctx.Req.TTL, cacheRes.From, cacheRes.Until)
Expand All @@ -551,11 +566,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]chun
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
tracing.Failure(span)
tracing.Errorf(span, "itergen: error getting iter from cassandra slice %+v", err)
tracing.Errorf(span, "itergen: error getting iter from store slice %+v", err)
return iters, err
}
// it's important that the itgens get added in chronological order,
// currently we rely on cassandra returning results in order
// currently we rely on store returning results in order
s.Cache.Add(ctx.AMKey, prevts, itgen)
prevts = itgen.Ts
iters = append(iters, *it)
Expand Down Expand Up @@ -656,8 +671,32 @@ func newRequestContext(ctx context.Context, req *models.Req, consolidator consol
// STORED DATA 0[----------60][---------120][---------180][---------240] but data for 60 may be at 1..60, data for 120 at 61..120 and for 180 at 121..180 (due to quantizing)
// to retrieve the stored data, we also use from inclusive and to exclusive,
// so to make sure that the data after quantization (Fix()) is correct, we have to make the following adjustment:
// `from` 1..60 needs data 1..60 -> always adjust `from` to previous boundary+1 (here 1)
// `to` 181..240 needs data 121..180 -> always adjust `to` to previous boundary+1 (here 181)
// `from` 1..60 needs data 1..60 -> to assure we can read that data we adjust `from` to previous boundary+1 (here 1). (will be quantized to next boundary in this case 60)
// `to` 181..240 needs data 121..180 -> to avoid reading needless data we adjust `to` to previous boundary+1 (here 181), last ts returned must be 180

// except... there's a special case. let's say archinterval=60, and user requests:
// to=25, until=36
// we know that in the graphite model there will be no data in that timeframe:
// maybe the user submitted a point with ts=30 but it will be quantized to ts=60 so it is out of range.
// but wouldn't it be nice to include it anyway?
// we can argue both ways, but the other thing is that if we apply the logic above, we end up with:
// from := 1
// to := 1
// which is a query not accepted by AggMetric, Ccache or store. (from must be less than to)
// the only way we can get acceptable queries is for from to be 1 and to to remain 36 (or become 60 or 61)
// such a fetch request would include the requested point
// but we know Fix() will later create the output according to these rules:
// * first point should have the first timestamp >= from that divides by interval (that would be 60 in this case)
// * last point should have the last timestamp < to that divides by interval (because to is always exclusive) (that would be 0 in this case)
// which wouldn't make sense of course. one could argue it should output one point with ts=60,
// but to do that, we have to "broaden" the `to` requested by the user, covering a larger time frame they didn't actually request.
// and we deviate from the quantizing model.
// I think we should just stick to the quantizing model

// we can do the logic above backwards: if from and to are adjusted to the same value, such as 181, it means `from` was 181..240 and `to` was 181..240
// which is either a nonsensical request (from > to, from == to) or from < to but such that the requested timeframe falls in between two adjacent quantized
// timestamps and could not include either of them.
// so the caller can just compare rc.From and rc.To and if equal, immediately return [] to the client.

if consolidator == consolidation.None {
rc.From = prevBoundary(req.From, req.ArchInterval) + 1
Expand Down
145 changes: 77 additions & 68 deletions api/dataprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,17 +447,24 @@ func generateChunks(span uint32, start uint32, end uint32) []chunk.Chunk {
var chunks []chunk.Chunk

c := chunk.New(start)
for i := start; i < end; i++ {
c.Push(i, float64((i-start)*2))
if (i+1)%span == 0 {
// Mark the chunk that just got finished as finished
for ts := start; ts < end; ts++ {
val := float64((ts - start) * 2)
c.Push(ts, val)
// handle the case of this being the last point for this chunk
if (ts+1)%span == 0 {
c.Finish()
chunks = append(chunks, *c)
if i < end {
c = chunk.New(i + 1)
// if there will be a next iteration, prepare the chunk
if ts+1 < end {
c = chunk.New(ts + 1)
}
}
}
// if end was not quantized we have to finish the last chunk
if !c.Closed {
c.Finish()
chunks = append(chunks, *c)
}
return chunks
}

Expand All @@ -479,10 +486,9 @@ func generateChunks(span uint32, start uint32, end uint32) []chunk.Chunk {
//
func TestGetSeriesCachedStore(t *testing.T) {
span := uint32(600)
// save some electrons by skipping steps that are no edge cases
steps := span / 10
start := span
// we want 10 chunks to serve the largest testcase
// they will have t0 600, 1200, ..., 5400, 6000
end := span * 11
chunks := generateChunks(span, start, end)

Expand All @@ -493,12 +499,12 @@ func TestGetSeriesCachedStore(t *testing.T) {
metrics := mdata.NewAggMetrics(store, &cache.MockCache{}, false, 0, 0, 0)
srv.BindMemoryStore(metrics)
metric := test.GetAMKey(1)
var c *cache.CCache
var itgen *chunk.IterGen
var prevts uint32

type testcase struct {
// the pattern of chunks in store, cache or both
// the pattern of chunks
// c: in cache
// s: in store
// b: in both
Pattern string

// expected number of cache hits on query over all chunks
Expand All @@ -521,24 +527,28 @@ func TestGetSeriesCachedStore(t *testing.T) {
for _, tc := range testcases {
pattern := tc.Pattern

// last ts is start ts plus the number of spans the pattern defines
// lastTs is the t0 of the first chunk that comes after the used range
lastTs := start + span*uint32(len(pattern))

// we want to query through various ranges, including:
// - from first ts to first ts
// - from first ts to last ts
// - from last ts to last ts
// and various ranges between
for from := start; from <= lastTs; from += steps {
for to := from; to <= lastTs; to += steps {
// reinstantiate the cache at the beginning of each run
c = cache.NewCCache()
// we increment from and to in tenths of a span,
// because incrementing by 1 would be needlessly expensive
step := span / 10
for from := start; from <= lastTs; from += step {
for to := from; to <= lastTs; to += step {
// use fresh store and cache
c := cache.NewCCache()
srv.BindCache(c)
store.Reset()

// populate cache and store according to pattern definition
prevts = 0
var prevts uint32
for i := 0; i < len(tc.Pattern); i++ {
itgen = chunk.NewBareIterGen(chunks[i].Series.Bytes(), chunks[i].Series.T0, span)
itgen := chunk.NewBareIterGen(chunks[i].Series.Bytes(), chunks[i].Series.T0, span)
if pattern[i] == 'c' || pattern[i] == 'b' {
c.Add(metric, prevts, *itgen)
}
Expand All @@ -554,6 +564,14 @@ func TestGetSeriesCachedStore(t *testing.T) {
req.ArchInterval = 1
ctx := newRequestContext(test.NewContext(), &req, consolidation.None)
iters, err := srv.getSeriesCachedStore(ctx, to)

// test invalid query; from must be less than to
if from == to {
if err == nil {
t.Fatalf("Pattern %s From=To %d: expected err, got nil", pattern, from)
}
continue
}
if err != nil {
t.Fatalf("Pattern %s From %d To %d: error %s", pattern, from, to, err)
}
Expand All @@ -568,7 +586,7 @@ func TestGetSeriesCachedStore(t *testing.T) {
// we use the tsTracker to increase together with the iterators and compare at each step
tsTracker := expectResFrom

tsSlice := make([]uint32, 0)
var tsSlice []uint32
for i, it := range iters {
for it.Next() {
ts, _ := it.Values()
Expand All @@ -580,74 +598,66 @@ func TestGetSeriesCachedStore(t *testing.T) {
}
}

if to-from > 0 {
if len(tsSlice) == 0 {
t.Fatalf("Pattern %s From %d To %d; Should have >0 results but got 0", pattern, from, to)
}
if tsSlice[0] != expectResFrom {
t.Fatalf("Pattern %s From %d To %d; Expected first to be %d but got %d", pattern, from, to, expectResFrom, tsSlice[0])
}
if tsSlice[len(tsSlice)-1] != expectResTo {
t.Fatalf("Pattern %s From %d To %d; Expected last to be %d but got %d", pattern, from, to, expectResTo, tsSlice[len(tsSlice)-1])
}
} else if len(tsSlice) > 0 {
t.Fatalf("Pattern %s From %d To %d; Expected results to have len 0 but got %d", pattern, from, to, len(tsSlice))
if len(tsSlice) == 0 {
t.Fatalf("Pattern %s From %d To %d; Should have >0 results but got 0", pattern, from, to)
}
if tsSlice[0] != expectResFrom {
t.Fatalf("Pattern %s From %d To %d; Expected first to be %d but got %d", pattern, from, to, expectResFrom, tsSlice[0])
}
if tsSlice[len(tsSlice)-1] != expectResTo {
t.Fatalf("Pattern %s From %d To %d; Expected last to be %d but got %d", pattern, from, to, expectResTo, tsSlice[len(tsSlice)-1])
}

expectedHits := uint32(0)
complete := false
// because ranges are exclusive at the end we'll test for to - 1
exclTo := to - 1

// if from is equal to we always expect 0 hits
if from != to {
// seek hits from beginning of the searched ranged within the given pattern
for i := 0; i < len(pattern); i++ {

// seek hits from beginning of the searched ranged within the given pattern
for i := 0; i < len(pattern); i++ {
// if pattern index is lower than from's chunk we continue
if from-(from%span) > start+uint32(i)*span {
continue
}

// current pattern index is a cache hit, so we expect one more
if pattern[i] == 'c' || pattern[i] == 'b' {
expectedHits++
} else {
break
}

// if pattern index is lower than from's chunk we continue
if from-(from%span) > start+uint32(i)*span {
// if we've already seeked beyond to's pattern we break and mark the seek as complete
if exclTo-(exclTo%span) == start+uint32(i)*span {
complete = true
break
}
}

// only if the previous seek was not complete we launch one from the other end
if !complete {

// now the same from the other end (just like the cache searching does)
for i := len(pattern) - 1; i >= 0; i-- {

// if pattern index is above to's chunk we continue
if exclTo-(exclTo%span)+span <= start+uint32(i)*span {
continue
}

// current pattern index is a cache hit, so we expect one more
// current pattern index is a cache hit, so we expecte one more
if pattern[i] == 'c' || pattern[i] == 'b' {
expectedHits++
} else {
break
}

// if we've already seeked beyond to's pattern we break and mark the seek as complete
if exclTo-(exclTo%span) == start+uint32(i)*span {
complete = true
// if we've already seeked beyond from's pattern we break
if from-(from%span) == start+uint32(i)*span {
break
}
}

// only if the previous seek was not complete we launch one from the other end
if !complete {

// now the same from the other end (just like the cache searching does)
for i := len(pattern) - 1; i >= 0; i-- {

// if pattern index is above to's chunk we continue
if exclTo-(exclTo%span)+span <= start+uint32(i)*span {
continue
}

// current pattern index is a cache hit, so we expecte one more
if pattern[i] == 'c' || pattern[i] == 'b' {
expectedHits++
} else {
break
}

// if we've already seeked beyond from's pattern we break
if from-(from%span) == start+uint32(i)*span {
break
}
}
}
}

// verify we got all cache hits we should have
Expand All @@ -659,7 +669,6 @@ func TestGetSeriesCachedStore(t *testing.T) {

// stop cache go routines before reinstantiating it at the top of the loop
c.Stop()
store.Reset()
}
}
}
Expand Down
Loading