Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

cleanup dataprocessor code #1753

Merged
merged 2 commits into from
Apr 7, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,24 @@ func (s *Server) getTargets(ctx context.Context, ss *models.StorageStats, reqs [
return out, nil
}

// getTargetsRemote issues the requests on other nodes
// it's nothing more than a thin network wrapper around getTargetsLocal of a peer.
// getTargetsRemote issues the requests - keyed by node name - on other nodes
func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats, remoteReqs map[string][]models.Req) ([]models.Series, error) {

allPeers, err := cluster.MembersForSpeculativeQuery()
if err != nil {
return nil, err
}

// will contain all replicas for each shardgroup
// (though typically only one replica is used per shardgroup unless spec-exec kicks in)
requiredPeers := make(map[int32][]cluster.Node)
shardReqs := make(map[int32][]models.Req)

// Note: we can expect remoteReqs to possibly have multiple node string keys that are part of the same shardgroup.
// Why? Because a request may need multiple series lookups, and issue multiple, distinct find/find_by_tag calls, each of which may end up using different replicas
// within the same shardgroup (due to changing priorities, spec-exec, etc)>
// Thus here we categorize all requests into groups per shard ID, rather than by hostname.

for _, nodeReqs := range remoteReqs {
shardID := nodeReqs[0].Node.GetPartitions()[0]
peers := allPeers[shardID]
Expand All @@ -242,7 +249,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats,
var resp models.GetDataRespV1
reqs, ok := shardReqs[node.GetPartitions()[0]]
if !ok {
log.Warnf("Unexpected shard group, node = %q", node)
log.Warnf("DP Unexpected shard group, node = %q", node)
// Return empty response, no error
return resp, nil
}
Expand Down Expand Up @@ -449,7 +456,7 @@ func (s *Server) getSeries(ctx *requestContext, ss *models.StorageStats) (mdata.
}
ss.IncChunksFromTank(uint32(len(res.Iters)))

log.Debugf("oldest from aggmetrics is %d", res.Oldest)
log.Debugf("DP oldest from aggmetrics is %d", res.Oldest)
span := opentracing.SpanFromContext(ctx.ctx)

if res.Oldest <= ctx.From {
Expand All @@ -465,7 +472,7 @@ func (s *Server) getSeries(ctx *requestContext, ss *models.StorageStats) (mdata.
if err != nil {
tracing.Failure(span)
tracing.Error(span, err)
log.Errorf("getSeriesCachedStore: %s", err.Error())
log.Errorf("DP getSeriesCachedStore: %s", err.Error())
return res, err
}
res.Iters = append(fromCache, res.Iters...)
Expand Down Expand Up @@ -530,12 +537,12 @@ func (s *Server) getSeriesCachedStore(ctx *requestContext, ss *models.StorageSta
reqSpanBoth.ValueUint32(ctx.To - ctx.From)
logLoad("cassan", ctx.AMKey, ctx.From, ctx.To)

log.Debugf("cache: searching query key %s, from %d, until %d", ctx.AMKey, ctx.From, until)
log.Debugf("DP cache: searching query key %s, from %d, until %d", ctx.AMKey, ctx.From, until)
cacheRes, err := s.Cache.Search(ctx.ctx, ctx.AMKey, ctx.From, until)
if err != nil {
return iters, fmt.Errorf("Cache.Search() failed: %+v", err.Error())
}
log.Debugf("cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End))
log.Debugf("DP cache: result start %d, end %d", len(cacheRes.Start), len(cacheRes.End))
ss.IncCacheResult(cacheRes.Type)

// check to see if the request has been canceled, if so abort now.
Expand Down