Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Jaeger cleanup: much fewer spans, but with more stats - and more stats for meta section #1380

Merged
merged 9 commits into from
Jul 10, 2019
15 changes: 15 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,16 @@ func (s *Server) indexList(ctx *middleware.Context, req models.IndexList) {
}

func (s *Server) getData(ctx *middleware.Context, request models.GetData) {
series, err := s.getTargetsLocal(ctx.Req.Context(), request.Requests)
var ss models.StorageStats
series, err := s.getTargetsLocal(ctx.Req.Context(), &ss, request.Requests)
if err != nil {
// the only errors returned are from us catching panics, so we should treat them
// all as internalServerErrors
log.Errorf("HTTP getData() %s", err.Error())
response.Write(ctx, response.WrapError(err))
return
}
response.Write(ctx, response.NewMsgp(200, &models.GetDataResp{Series: series}))
response.Write(ctx, response.NewMsgp(200, &models.GetDataRespV1{Stats: ss, Series: series}))
}

func (s *Server) indexDelete(ctx *middleware.Context, req models.IndexDelete) {
Expand Down
90 changes: 43 additions & 47 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/cache"
"github.com/grafana/metrictank/mdata/chunk/tsz"
"github.com/grafana/metrictank/tracing"
"github.com/grafana/metrictank/util"
Expand Down Expand Up @@ -147,7 +148,7 @@ func divide(pointsA, pointsB []schema.Point) []schema.Point {
return pointsA
}

func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Series, error) {
func (s *Server) getTargets(ctx context.Context, ss *models.StorageStats, reqs []models.Req) ([]models.Series, error) {
// split reqs into local and remote.
localReqs := make([]models.Req, 0)
remoteReqs := make(map[string][]models.Req)
Expand All @@ -169,7 +170,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
go func() {
// the only errors returned are from us catching panics, so we should treat them
// all as internalServerErrors
series, err := s.getTargetsLocal(getCtx, localReqs)
series, err := s.getTargetsLocal(getCtx, ss, localReqs)
if err != nil {
cancel()
}
Expand All @@ -181,7 +182,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
wg.Add(1)
go func() {
// all errors returned are *response.Error.
series, err := s.getTargetsRemote(getCtx, remoteReqs)
series, err := s.getTargetsRemote(getCtx, ss, remoteReqs)
if err != nil {
cancel()
}
Expand Down Expand Up @@ -209,7 +210,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se

// getTargetsRemote issues the requests on other nodes
// it's nothing more than a thin network wrapper around getTargetsLocal of a peer.
func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) {
func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats, remoteReqs map[string][]models.Req) ([]models.Series, error) {
responses := make(chan getTargetsResp, len(remoteReqs))
rCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -226,7 +227,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
responses <- getTargetsResp{nil, err}
return
}
var resp models.GetDataResp
var resp models.GetDataRespV1
_, err = resp.UnmarshalMsg(buf)
if err != nil {
cancel()
Expand All @@ -235,6 +236,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
return
}
log.Debugf("DP getTargetsRemote: %s returned %d series", node.GetName(), len(resp.Series))
ss.Add(&resp.Stats)
responses <- getTargetsResp{resp.Series, nil}
}(nodeReqs)
}
Expand All @@ -257,14 +259,17 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
}

// error is the error of the first failing target request
func (s *Server) getTargetsLocal(ctx context.Context, reqs []models.Req) ([]models.Series, error) {
func (s *Server) getTargetsLocal(ctx context.Context, ss *models.StorageStats, reqs []models.Req) ([]models.Series, error) {
log.Debugf("DP getTargetsLocal: handling %d reqs locally", len(reqs))
rCtx, span := tracing.NewSpan(ctx, s.Tracer, "getTargetsLocal")
defer span.Finish()
span.SetTag("num_reqs", len(reqs))
responses := make(chan getTargetsResp, len(reqs))

var wg sync.WaitGroup
reqLimiter := util.NewLimiter(getTargetsConcurrency)

rCtx, cancel := context.WithCancel(ctx)
rCtx, cancel := context.WithCancel(rCtx)
defer cancel()
LOOP:
for _, req := range reqs {
Expand All @@ -276,12 +281,9 @@ LOOP:
}
wg.Add(1)
go func(req models.Req) {
rCtx, span := tracing.NewSpan(rCtx, s.Tracer, "getTargetsLocal")
req.Trace(span)
pre := time.Now()
points, interval, err := s.getTarget(rCtx, req)
points, interval, err := s.getTarget(rCtx, ss, req)
if err != nil {
tags.Error.Set(span, true)
cancel() // cancel all other requests.
responses <- getTargetsResp{nil, err}
} else {
Expand All @@ -300,7 +302,6 @@ LOOP:
wg.Done()
// pop an item of our limiter so that other requests can be processed.
reqLimiter.Release()
span.Finish()
}(req)
}
go func() {
Expand All @@ -310,16 +311,18 @@ LOOP:
out := make([]models.Series, 0, len(reqs))
for resp := range responses {
if resp.err != nil {
tags.Error.Set(span, true)
return nil, resp.err
}
out = append(out, resp.series...)
}
ss.Trace(span)
log.Debugf("DP getTargetsLocal: %d series found locally", len(out))
return out, nil

}

func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema.Point, interval uint32, err error) {
func (s *Server) getTarget(ctx context.Context, ss *models.StorageStats, req models.Req) (points []schema.Point, interval uint32, err error) {
defer doRecover(&err)
readRollup := req.Archive != 0 // do we need to read from a downsampled series?
normalize := req.AggNum > 1 // do we need to normalize points at runtime?
Expand All @@ -333,21 +336,21 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema
}

if !readRollup && !normalize {
fixed, err := s.getSeriesFixed(ctx, req, consolidation.None)
fixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.None)
return fixed, req.OutInterval, err
} else if !readRollup && normalize {
fixed, err := s.getSeriesFixed(ctx, req, consolidation.None)
fixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.None)
if err != nil {
return nil, req.OutInterval, err
}
return consolidation.ConsolidateContext(ctx, fixed, req.AggNum, req.Consolidator), req.OutInterval, nil
} else if readRollup && !normalize {
if req.Consolidator == consolidation.Avg {
sumFixed, err := s.getSeriesFixed(ctx, req, consolidation.Sum)
sumFixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.Sum)
if err != nil {
return nil, req.OutInterval, err
}
cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt)
cntFixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.Cnt)
if err != nil {
return nil, req.OutInterval, err
}
Expand All @@ -357,17 +360,17 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema
cntFixed,
), req.OutInterval, nil
} else {
fixed, err := s.getSeriesFixed(ctx, req, req.Consolidator)
fixed, err := s.getSeriesFixed(ctx, ss, req, req.Consolidator)
return fixed, req.OutInterval, err
}
} else {
// readRollup && normalize
if req.Consolidator == consolidation.Avg {
sumFixed, err := s.getSeriesFixed(ctx, req, consolidation.Sum)
sumFixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.Sum)
if err != nil {
return nil, req.OutInterval, err
}
cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt)
cntFixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.Cnt)
if err != nil {
return nil, req.OutInterval, err
}
Expand All @@ -377,7 +380,7 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema
consolidation.Consolidate(cntFixed, req.AggNum, consolidation.Sum),
), req.OutInterval, nil
} else {
fixed, err := s.getSeriesFixed(ctx, req, req.Consolidator)
fixed, err := s.getSeriesFixed(ctx, ss, req, req.Consolidator)
if err != nil {
return nil, req.OutInterval, err
}
Expand All @@ -393,7 +396,7 @@ func logLoad(typ string, key schema.AMKey, from, to uint32) {
// getSeriesFixed gets the series and makes sure the output is quantized
// (needed because the raw chunks don't contain quantized data)
// TODO: we can probably forego Fix if archive > 0
func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) {
func (s *Server) getSeriesFixed(ctx context.Context, ss *models.StorageStats, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) {
select {
case <-ctx.Done():
//request canceled
Expand All @@ -405,7 +408,7 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
if rctx.From == rctx.To {
return nil, nil
}
res, err := s.getSeries(rctx)
res, err := s.getSeries(rctx, ss)
if err != nil {
return nil, err
}
Expand All @@ -425,7 +428,7 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
// getSeries returns points from mem (and store if needed), within the range from (inclusive) - to (exclusive)
// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval
// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series.
func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
func (s *Server) getSeries(ctx *requestContext, ss *models.StorageStats) (mdata.Result, error) {
res, err := s.getSeriesAggMetrics(ctx)
if err != nil {
return res, err
Expand All @@ -436,10 +439,10 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
return res, nil
default:
}
ss.IncChunksFromTank(uint32(len(res.Iters)))

log.Debugf("oldest from aggmetrics is %d", res.Oldest)
span := opentracing.SpanFromContext(ctx.ctx)
span.SetTag("oldest_in_ring", res.Oldest)

if res.Oldest <= ctx.From {
reqSpanMem.ValueUint32(ctx.To - ctx.From)
Expand All @@ -450,8 +453,11 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
// if to < oldest -> no need to search until oldest, only search until to
// adds iters from both the cache and the store (if applicable)
until := util.Min(res.Oldest, ctx.To)
fromCache, err := s.getSeriesCachedStore(ctx, until)
fromCache, err := s.getSeriesCachedStore(ctx, ss, until)
if err != nil {
tracing.Failure(span)
tracing.Error(span, err)
log.Errorf("getSeriesCachedStore: %s", err.Error())
return res, err
}
res.Iters = append(fromCache, res.Iters...)
Expand Down Expand Up @@ -482,9 +488,6 @@ func (s *Server) itersToPoints(ctx *requestContext, iters []tsz.Iter) []schema.P
}

func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error) {
_, span := tracing.NewSpan(ctx.ctx, s.Tracer, "getSeriesAggMetrics")
defer span.Finish()

// this is a query node that for some reason received a request
if s.MemoryStore == nil {
return mdata.Result{}, nil
Expand All @@ -506,7 +509,7 @@ func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error)
}

// will only fetch until until, but uses ctx.To for debug logging
func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.Iter, error) {
func (s *Server) getSeriesCachedStore(ctx *requestContext, ss *models.StorageStats, until uint32) ([]tsz.Iter, error) {

// this is a query node that for some reason received a data request
if s.BackendStore == nil {
Expand All @@ -516,21 +519,16 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
var iters []tsz.Iter
var prevts uint32

_, span := tracing.NewSpan(ctx.ctx, s.Tracer, "getSeriesCachedStore")
defer span.Finish()
span.SetTag("key", ctx.AMKey)
span.SetTag("from", ctx.From)
span.SetTag("until", until)

reqSpanBoth.ValueUint32(ctx.To - ctx.From)
logLoad("cassan", ctx.AMKey, ctx.From, ctx.To)

log.Debugf("cache: searching query key %s, from %d, until %d", ctx.AMKey, ctx.From, until)
cacheRes, err := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until)
if err != nil {
return iters, err
return iters, fmt.Errorf("Cache.Search() failed: %+v", err.Error())
}
log.Debugf("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End))
ss.IncCacheResult(cacheRes.Type)

// check to see if the request has been canceled, if so abort now.
select {
Expand All @@ -545,12 +543,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
prevts = itgen.T0
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
tracing.Failure(span)
tracing.Errorf(span, "itergen: error getting iter from Start list %+v", err)
return iters, err
return iters, fmt.Errorf("error getting iter from cacheResult.Start: %+v", err.Error())
}
iters = append(iters, iter)
}
ss.IncChunksFromCache(uint32(len(cacheRes.Start)))

// check to see if the request has been canceled, if so abort now.
select {
Expand All @@ -561,11 +558,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
}

// the request cannot completely be served from cache, it will require store involvement
if !cacheRes.Complete {
if cacheRes.Type != cache.Hit {
if cacheRes.From != cacheRes.Until {
storeIterGens, err := s.BackendStore.Search(ctx.ctx, ctx.AMKey, ctx.Req.TTL, cacheRes.From, cacheRes.Until)
if err != nil {
return iters, err
return iters, fmt.Errorf("BackendStore.Search() failed: %+v", err.Error())
}
// check to see if the request has been canceled, if so abort now.
select {
Expand All @@ -579,16 +576,15 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
iter, err := itgen.Get()
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
tracing.Failure(span)
tracing.Errorf(span, "itergen: error getting iter from store slice %+v", err)
if i > 0 {
// add all the iterators that are in good shape
s.Cache.AddRange(ctx.AMKey, prevts, storeIterGens[:i])
}
return iters, err
return iters, fmt.Errorf("error getting iter from BackendStore.Search(): %+v", err.Error())
}
iters = append(iters, iter)
}
ss.IncChunksFromStore(uint32(len(storeIterGens)))
// it's important that the itgens get added in chronological order,
// currently we rely on store returning results in order
s.Cache.AddRange(ctx.AMKey, prevts, storeIterGens)
Expand All @@ -599,11 +595,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
iter, err := cacheRes.End[i].Get()
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
log.Errorf("itergen: error getting iter from cache result end slice %+v", err.Error())
return iters, err
return iters, fmt.Errorf("error getting iter from cacheResult.End: %+v", err.Error())
}
iters = append(iters, iter)
}
ss.IncChunksFromCache(uint32(len(cacheRes.End)))
}

return iters, nil
Expand Down
Loading