Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Jaeger cleanup: much fewer spans, but with more stats - and more stats for meta section #1380

Merged
merged 9 commits into from
Jul 10, 2019
15 changes: 15 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/metrictank/cluster"
"github.com/grafana/metrictank/expr/tagquery"
"github.com/grafana/metrictank/stats"
opentracing "github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
"github.com/tinylib/msgp/msgp"
)
Expand Down Expand Up @@ -339,15 +340,17 @@ func (s *Server) indexList(ctx *middleware.Context, req models.IndexList) {
}

func (s *Server) getData(ctx *middleware.Context, request models.GetData) {
series, err := s.getTargetsLocal(ctx.Req.Context(), request.Requests)
var ss models.StorageStats
series, err := s.getTargetsLocal(ctx.Req.Context(), &ss, request.Requests)
if err != nil {
// the only errors returned are from us catching panics, so we should treat them
// all as internalServerErrors
log.Errorf("HTTP getData() %s", err.Error())
response.Write(ctx, response.WrapError(err))
return
}
response.Write(ctx, response.NewMsgp(200, &models.GetDataResp{Series: series}))
ss.Trace(opentracing.SpanFromContext(ctx.Req.Context()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this is needed here. ss.Trace() is already called at the end of s.getTargetsLocal()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. we only need it in executePlan (for individual nodes, both local and cluster peers), and executePlan (for aggregated-across-peers per-response stats)

response.Write(ctx, response.NewMsgp(200, &models.GetDataRespV1{Stats: ss, Series: series}))
}

func (s *Server) indexDelete(ctx *middleware.Context, req models.IndexDelete) {
Expand Down
90 changes: 43 additions & 47 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/consolidation"
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/cache"
"github.com/grafana/metrictank/mdata/chunk/tsz"
"github.com/grafana/metrictank/tracing"
"github.com/grafana/metrictank/util"
Expand Down Expand Up @@ -147,7 +148,7 @@ func divide(pointsA, pointsB []schema.Point) []schema.Point {
return pointsA
}

func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Series, error) {
func (s *Server) getTargets(ctx context.Context, ss *models.StorageStats, reqs []models.Req) ([]models.Series, error) {
// split reqs into local and remote.
localReqs := make([]models.Req, 0)
remoteReqs := make(map[string][]models.Req)
Expand All @@ -169,7 +170,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
go func() {
// the only errors returned are from us catching panics, so we should treat them
// all as internalServerErrors
series, err := s.getTargetsLocal(getCtx, localReqs)
series, err := s.getTargetsLocal(getCtx, ss, localReqs)
if err != nil {
cancel()
}
Expand All @@ -181,7 +182,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
wg.Add(1)
go func() {
// all errors returned are *response.Error.
series, err := s.getTargetsRemote(getCtx, remoteReqs)
series, err := s.getTargetsRemote(getCtx, ss, remoteReqs)
if err != nil {
cancel()
}
Expand Down Expand Up @@ -209,7 +210,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se

// getTargetsRemote issues the requests on other nodes
// it's nothing more than a thin network wrapper around getTargetsLocal of a peer.
func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]models.Req) ([]models.Series, error) {
func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats, remoteReqs map[string][]models.Req) ([]models.Series, error) {
responses := make(chan getTargetsResp, len(remoteReqs))
rCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -226,7 +227,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
responses <- getTargetsResp{nil, err}
return
}
var resp models.GetDataResp
var resp models.GetDataRespV1
_, err = resp.UnmarshalMsg(buf)
if err != nil {
cancel()
Expand All @@ -235,6 +236,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
return
}
log.Debugf("DP getTargetsRemote: %s returned %d series", node.GetName(), len(resp.Series))
ss.Add(resp.Stats)
responses <- getTargetsResp{resp.Series, nil}
}(nodeReqs)
}
Expand All @@ -257,14 +259,17 @@ func (s *Server) getTargetsRemote(ctx context.Context, remoteReqs map[string][]m
}

// error is the error of the first failing target request
func (s *Server) getTargetsLocal(ctx context.Context, reqs []models.Req) ([]models.Series, error) {
func (s *Server) getTargetsLocal(ctx context.Context, ss *models.StorageStats, reqs []models.Req) ([]models.Series, error) {
log.Debugf("DP getTargetsLocal: handling %d reqs locally", len(reqs))
rCtx, span := tracing.NewSpan(ctx, s.Tracer, "getTargetsLocal")
defer span.Finish()
span.SetTag("num_reqs", len(reqs))
responses := make(chan getTargetsResp, len(reqs))

var wg sync.WaitGroup
reqLimiter := util.NewLimiter(getTargetsConcurrency)

rCtx, cancel := context.WithCancel(ctx)
rCtx, cancel := context.WithCancel(rCtx)
defer cancel()
LOOP:
for _, req := range reqs {
Expand All @@ -276,12 +281,9 @@ LOOP:
}
wg.Add(1)
go func(req models.Req) {
rCtx, span := tracing.NewSpan(rCtx, s.Tracer, "getTargetsLocal")
req.Trace(span)
pre := time.Now()
points, interval, err := s.getTarget(rCtx, req)
points, interval, err := s.getTarget(rCtx, ss, req)
if err != nil {
tags.Error.Set(span, true)
cancel() // cancel all other requests.
responses <- getTargetsResp{nil, err}
} else {
Expand All @@ -300,7 +302,6 @@ LOOP:
wg.Done()
// pop an item of our limiter so that other requests can be processed.
reqLimiter.Release()
span.Finish()
}(req)
}
go func() {
Expand All @@ -310,16 +311,18 @@ LOOP:
out := make([]models.Series, 0, len(reqs))
for resp := range responses {
if resp.err != nil {
tags.Error.Set(span, true)
return nil, resp.err
}
out = append(out, resp.series...)
}
ss.Trace(span)
log.Debugf("DP getTargetsLocal: %d series found locally", len(out))
return out, nil

}

func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema.Point, interval uint32, err error) {
func (s *Server) getTarget(ctx context.Context, ss *models.StorageStats, req models.Req) (points []schema.Point, interval uint32, err error) {
defer doRecover(&err)
readRollup := req.Archive != 0 // do we need to read from a downsampled series?
normalize := req.AggNum > 1 // do we need to normalize points at runtime?
Expand All @@ -333,21 +336,21 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema
}

if !readRollup && !normalize {
fixed, err := s.getSeriesFixed(ctx, req, consolidation.None)
fixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.None)
return fixed, req.OutInterval, err
} else if !readRollup && normalize {
fixed, err := s.getSeriesFixed(ctx, req, consolidation.None)
fixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.None)
if err != nil {
return nil, req.OutInterval, err
}
return consolidation.ConsolidateContext(ctx, fixed, req.AggNum, req.Consolidator), req.OutInterval, nil
} else if readRollup && !normalize {
if req.Consolidator == consolidation.Avg {
sumFixed, err := s.getSeriesFixed(ctx, req, consolidation.Sum)
sumFixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.Sum)
if err != nil {
return nil, req.OutInterval, err
}
cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt)
cntFixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.Cnt)
if err != nil {
return nil, req.OutInterval, err
}
Expand All @@ -357,17 +360,17 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema
cntFixed,
), req.OutInterval, nil
} else {
fixed, err := s.getSeriesFixed(ctx, req, req.Consolidator)
fixed, err := s.getSeriesFixed(ctx, ss, req, req.Consolidator)
return fixed, req.OutInterval, err
}
} else {
// readRollup && normalize
if req.Consolidator == consolidation.Avg {
sumFixed, err := s.getSeriesFixed(ctx, req, consolidation.Sum)
sumFixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.Sum)
if err != nil {
return nil, req.OutInterval, err
}
cntFixed, err := s.getSeriesFixed(ctx, req, consolidation.Cnt)
cntFixed, err := s.getSeriesFixed(ctx, ss, req, consolidation.Cnt)
if err != nil {
return nil, req.OutInterval, err
}
Expand All @@ -377,7 +380,7 @@ func (s *Server) getTarget(ctx context.Context, req models.Req) (points []schema
consolidation.Consolidate(cntFixed, req.AggNum, consolidation.Sum),
), req.OutInterval, nil
} else {
fixed, err := s.getSeriesFixed(ctx, req, req.Consolidator)
fixed, err := s.getSeriesFixed(ctx, ss, req, req.Consolidator)
if err != nil {
return nil, req.OutInterval, err
}
Expand All @@ -393,7 +396,7 @@ func logLoad(typ string, key schema.AMKey, from, to uint32) {
// getSeriesFixed gets the series and makes sure the output is quantized
// (needed because the raw chunks don't contain quantized data)
// TODO: we can probably forego Fix if archive > 0
func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) {
func (s *Server) getSeriesFixed(ctx context.Context, ss *models.StorageStats, req models.Req, consolidator consolidation.Consolidator) ([]schema.Point, error) {
select {
case <-ctx.Done():
//request canceled
Expand All @@ -405,7 +408,7 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
if rctx.From == rctx.To {
return nil, nil
}
res, err := s.getSeries(rctx)
res, err := s.getSeries(rctx, ss)
if err != nil {
return nil, err
}
Expand All @@ -425,7 +428,7 @@ func (s *Server) getSeriesFixed(ctx context.Context, req models.Req, consolidato
// getSeries returns points from mem (and store if needed), within the range from (inclusive) - to (exclusive)
// it can query for data within aggregated archives, by using fn min/max/sum/cnt and providing the matching agg span as interval
// pass consolidation.None as consolidator to mean read from raw interval, otherwise we'll read from aggregated series.
func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
func (s *Server) getSeries(ctx *requestContext, ss *models.StorageStats) (mdata.Result, error) {
res, err := s.getSeriesAggMetrics(ctx)
if err != nil {
return res, err
Expand All @@ -436,10 +439,10 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
return res, nil
default:
}
ss.IncChunksFromTank(uint32(len(res.Iters)))

log.Debugf("oldest from aggmetrics is %d", res.Oldest)
span := opentracing.SpanFromContext(ctx.ctx)
span.SetTag("oldest_in_ring", res.Oldest)

if res.Oldest <= ctx.From {
reqSpanMem.ValueUint32(ctx.To - ctx.From)
Expand All @@ -450,8 +453,11 @@ func (s *Server) getSeries(ctx *requestContext) (mdata.Result, error) {
// if to < oldest -> no need to search until oldest, only search until to
// adds iters from both the cache and the store (if applicable)
until := util.Min(res.Oldest, ctx.To)
fromCache, err := s.getSeriesCachedStore(ctx, until)
fromCache, err := s.getSeriesCachedStore(ctx, ss, until)
if err != nil {
tracing.Failure(span)
tracing.Error(span, err)
log.Errorf("getSeriesCachedStore: %s", err.Error())
return res, err
}
res.Iters = append(fromCache, res.Iters...)
Expand Down Expand Up @@ -482,9 +488,6 @@ func (s *Server) itersToPoints(ctx *requestContext, iters []tsz.Iter) []schema.P
}

func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error) {
_, span := tracing.NewSpan(ctx.ctx, s.Tracer, "getSeriesAggMetrics")
defer span.Finish()

// this is a query node that for some reason received a request
if s.MemoryStore == nil {
return mdata.Result{}, nil
Expand All @@ -506,7 +509,7 @@ func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error)
}

// will only fetch until until, but uses ctx.To for debug logging
func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.Iter, error) {
func (s *Server) getSeriesCachedStore(ctx *requestContext, ss *models.StorageStats, until uint32) ([]tsz.Iter, error) {

// this is a query node that for some reason received a data request
if s.BackendStore == nil {
Expand All @@ -516,21 +519,16 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
var iters []tsz.Iter
var prevts uint32

_, span := tracing.NewSpan(ctx.ctx, s.Tracer, "getSeriesCachedStore")
defer span.Finish()
span.SetTag("key", ctx.AMKey)
span.SetTag("from", ctx.From)
span.SetTag("until", until)

reqSpanBoth.ValueUint32(ctx.To - ctx.From)
logLoad("cassan", ctx.AMKey, ctx.From, ctx.To)

log.Debugf("cache: searching query key %s, from %d, until %d", ctx.AMKey, ctx.From, until)
cacheRes, err := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until)
if err != nil {
return iters, err
return iters, fmt.Errorf("Cache.Search() failed: %+v", err.Error())
}
log.Debugf("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End))
ss.IncCacheResult(cacheRes.Type)

// check to see if the request has been canceled, if so abort now.
select {
Expand All @@ -545,12 +543,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
prevts = itgen.T0
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
tracing.Failure(span)
tracing.Errorf(span, "itergen: error getting iter from Start list %+v", err)
return iters, err
return iters, fmt.Errorf("error getting iter from cacheResult.Start: %+v", err.Error())
}
iters = append(iters, iter)
}
ss.IncChunksFromCache(uint32(len(cacheRes.Start)))

// check to see if the request has been canceled, if so abort now.
select {
Expand All @@ -561,11 +558,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
}

// the request cannot completely be served from cache, it will require store involvement
if !cacheRes.Complete {
if cacheRes.Type != cache.Hit {
if cacheRes.From != cacheRes.Until {
storeIterGens, err := s.BackendStore.Search(ctx.ctx, ctx.AMKey, ctx.Req.TTL, cacheRes.From, cacheRes.Until)
if err != nil {
return iters, err
return iters, fmt.Errorf("BackendStore.Search() failed: %+v", err.Error())
}
// check to see if the request has been canceled, if so abort now.
select {
Expand All @@ -579,16 +576,15 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
iter, err := itgen.Get()
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
tracing.Failure(span)
tracing.Errorf(span, "itergen: error getting iter from store slice %+v", err)
if i > 0 {
// add all the iterators that are in good shape
s.Cache.AddRange(ctx.AMKey, prevts, storeIterGens[:i])
}
return iters, err
return iters, fmt.Errorf("error getting iter from BackendStore.Search(): %+v", err.Error())
}
iters = append(iters, iter)
}
ss.IncChunksFromStore(uint32(len(storeIterGens)))
// it's important that the itgens get added in chronological order,
// currently we rely on store returning results in order
s.Cache.AddRange(ctx.AMKey, prevts, storeIterGens)
Expand All @@ -599,11 +595,11 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.
iter, err := cacheRes.End[i].Get()
if err != nil {
// TODO(replay) figure out what to do if one piece is corrupt
log.Errorf("itergen: error getting iter from cache result end slice %+v", err.Error())
return iters, err
return iters, fmt.Errorf("error getting iter from cacheResult.End: %+v", err.Error())
}
iters = append(iters, iter)
}
ss.IncChunksFromCache(uint32(len(cacheRes.End)))
}

return iters, nil
Expand Down
Loading