Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #956 from bloomberg/speculativeQueries
Browse files Browse the repository at this point in the history
Speculative queries
  • Loading branch information
replay authored Jul 25, 2018
2 parents 9986b3e + 840f352 commit a7f011d
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 124 deletions.
119 changes: 119 additions & 0 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,31 @@ import (
"net/http"
"strconv"
"sync"
"time"

"github.com/grafana/metrictank/api/middleware"
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/api/response"
"github.com/grafana/metrictank/cluster"
"github.com/grafana/metrictank/stats"
"github.com/raintank/worldping-api/pkg/log"
"github.com/tinylib/msgp/msgp"
)

var NotFoundErr = errors.New("not found")

var (

// metric api.cluster.speculative.attempts is how many peer queries resulted in speculation
speculativeAttempts = stats.NewCounter32("api.cluster.speculative.attempts")

// metric api.cluster.speculative.wins is how many peer queries were improved due to speculation
speculativeWins = stats.NewCounter32("api.cluster.speculative.wins")

// metric api.cluster.speculative.requests is how many speculative http requests made to peers
speculativeRequests = stats.NewCounter32("api.cluster.speculative.requests")
)

func (s *Server) explainPriority(ctx *middleware.Context) {
var data []interface{}
for _, p := range s.prioritySetters {
Expand Down Expand Up @@ -287,3 +301,108 @@ func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, pa

return result, nil
}

// peerQuerySpeculative takes a request and the path to request it on, then fans it out
// across the cluster, except to the local peer. If any peer fails requests to
// other peers are aborted. If enough peers have been heard from (based on
// speculation-threshold configuration), and we are missing the others, try to
// speculatively query other members of the shard group.
// ctx: request context
// data: request to be submitted
// name: name to be used in logging & tracing
// path: path to request on
func (s *Server) peerQuerySpeculative(ctx context.Context, data cluster.Traceable, name, path string) (map[string]PeerResponse, error) {
peerGroups, err := cluster.MembersForSpeculativeQuery()
if err != nil {
log.Error(3, "HTTP peerQuery unable to get peers, %s", err)
return nil, err
}
log.Debug("HTTP %s across %d instances", name, len(peerGroups)-1)

reqCtx, cancel := context.WithCancel(ctx)
defer cancel()

originalPeers := make(map[string]struct{}, len(peerGroups))
pendingResponses := make(map[int32]struct{}, len(peerGroups))
receivedResponses := make(map[int32]struct{}, len(peerGroups))

responses := make(chan struct {
shardGroup int32
data PeerResponse
err error
}, 1)

askPeer := func(shardGroup int32, peer cluster.Node) {
log.Debug("HTTP Render querying %s%s", peer.GetName(), path)
buf, err := peer.Post(reqCtx, name, path, data)

select {
case <-ctx.Done():
return
default:
// Not canceled, continue
}

if err != nil {
cancel()
log.Error(4, "HTTP Render error querying %s%s: %q", peer.GetName(), path, err)
}
responses <- struct {
shardGroup int32
data PeerResponse
err error
}{shardGroup, PeerResponse{peer, buf}, err}
}

for group, peers := range peerGroups {
peer := peers[0]
originalPeers[peer.GetName()] = struct{}{}
pendingResponses[group] = struct{}{}
go askPeer(group, peer)
}

result := make(map[string]PeerResponse)

specCheckTicker := time.NewTicker(5 * time.Millisecond)

for len(pendingResponses) > 0 {
select {
case resp := <-responses:
if _, ok := receivedResponses[resp.shardGroup]; ok {
// already received this response (possibly speculatively)
continue
}

if resp.err != nil {
return nil, err
}

result[resp.data.peer.GetName()] = resp.data
receivedResponses[resp.shardGroup] = struct{}{}
delete(pendingResponses, resp.shardGroup)
delete(originalPeers, resp.data.peer.GetName())

case <-specCheckTicker.C:
// Check if it's time to speculate!
percentReceived := 1 - (float64(len(pendingResponses)) / float64(len(peerGroups)))
if percentReceived > speculationThreshold {
// kick off speculative queries to other members now
specCheckTicker.Stop()
speculativeAttempts.Inc()
for shardGroup := range pendingResponses {
eligiblePeers := peerGroups[shardGroup][1:]
for _, peer := range eligiblePeers {
speculativeRequests.Inc()
go askPeer(shardGroup, peer)
}
}
}
}
}

if len(originalPeers) > 0 {
speculativeWins.Inc()
}

return result, nil
}
2 changes: 2 additions & 0 deletions api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var (

getTargetsConcurrency int
tagdbDefaultLimit uint
speculationThreshold float64

graphiteProxy *httputil.ReverseProxy
timeZone *time.Location
Expand All @@ -50,6 +51,7 @@ func ConfigSetup() {
apiCfg.StringVar(&timeZoneStr, "time-zone", "local", "timezone for interpreting from/until values when needed, specified using [zoneinfo name](https://en.wikipedia.org/wiki/Tz_database#Names_of_time_zones) e.g. 'America/New_York', 'UTC' or 'local' to use local server timezone")
apiCfg.IntVar(&getTargetsConcurrency, "get-targets-concurrency", 20, "maximum number of concurrent threads for fetching data on the local node. Each thread handles a single series.")
apiCfg.UintVar(&tagdbDefaultLimit, "tagdb-default-limit", 100, "default limit for tagdb query results, can be overridden with query parameter \"limit\"")
apiCfg.Float64Var(&speculationThreshold, "speculation-threshold", 1, "ratio of peer responses after which speculation is used. Set to 1 to disable.")
globalconf.Register("http", apiCfg)
}

Expand Down
158 changes: 35 additions & 123 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,64 +55,39 @@ type Series struct {
}

func (s *Server) findSeries(ctx context.Context, orgId uint32, patterns []string, seenAfter int64) ([]Series, error) {
peers, err := cluster.MembersForQuery()
data := models.IndexFind{
Patterns: patterns,
OrgId: orgId,
From: seenAfter,
}

resps, err := s.peerQuerySpeculative(ctx, data, "findSeriesRemote", "/index/find")
if err != nil {
log.Error(3, "HTTP findSeries unable to get peers, %s", err)
return nil, err
}
log.Debug("HTTP findSeries for %v across %d instances", patterns, len(peers))
var wg sync.WaitGroup

responses := make(chan struct {
series []Series
err error
}, 1)
findCtx, cancel := context.WithCancel(ctx)
defer cancel()
for _, peer := range peers {
log.Debug("HTTP findSeries getting results from %s", peer.GetName())
wg.Add(1)
if peer.IsLocal() {
go func() {
result, err := s.findSeriesLocal(findCtx, orgId, patterns, seenAfter)
if err != nil {
// cancel requests on all other peers.
cancel()
}
responses <- struct {
series []Series
err error
}{result, err}
wg.Done()
}()
} else {
go func(peer cluster.Node) {
result, err := s.findSeriesRemote(findCtx, orgId, patterns, seenAfter, peer)
if err != nil {
// cancel requests on all other peers.
cancel()
}
responses <- struct {
series []Series
err error
}{result, err}
wg.Done()
}(peer)
}
select {
case <-ctx.Done():
//request canceled
return nil, nil
default:
}

// wait for all findSeries goroutines to end, then close our responses channel
go func() {
wg.Wait()
close(responses)
}()

series := make([]Series, 0)
for resp := range responses {
if resp.err != nil {
resp := models.IndexFindResp{}
for _, r := range resps {
_, err = resp.UnmarshalMsg(r.buf)
if err != nil {
return nil, err
}
series = append(series, resp.series...)

for pattern, nodes := range resp.Nodes {
series = append(series, Series{
Pattern: pattern,
Node: r.peer,
Series: nodes,
})
log.Debug("HTTP findSeries %d matches for %s found on %s", len(nodes), pattern, r.peer.GetName())
}
}

return series, nil
Expand Down Expand Up @@ -864,22 +839,10 @@ func (s *Server) graphiteTagDetails(ctx *middleware.Context, request models.Grap
}

func (s *Server) clusterTagDetails(ctx context.Context, orgId uint32, tag, filter string, from int64) (map[string]uint64, error) {
result, err := s.MetricIndex.TagDetails(orgId, tag, filter, from)
if err != nil {
return nil, err
}
if result == nil {
result = make(map[string]uint64)
}
select {
case <-ctx.Done():
//request canceled
return nil, nil
default:
}
result := make(map[string]uint64)

data := models.IndexTagDetails{OrgId: orgId, Tag: tag, Filter: filter, From: from}
resps, err := s.peerQuery(ctx, data, "clusterTagDetails", "/index/tag_details", false)
resps, err := s.peerQuerySpeculative(ctx, data, "clusterTagDetails", "/index/tag_details")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -926,7 +889,8 @@ func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.G
}

func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions []string, from int64) ([]Series, error) {
result, err := s.MetricIndex.FindByTag(orgId, expressions, from)
data := models.IndexFindByTag{OrgId: orgId, Expr: expressions, From: from}
resps, err := s.peerQuerySpeculative(ctx, data, "clusterFindByTag", "/index/find_by_tag")
if err != nil {
return nil, err
}
Expand All @@ -940,27 +904,6 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions

var allSeries []Series

for _, series := range result {
allSeries = append(allSeries, Series{
Pattern: series.Path,
Node: cluster.Manager.ThisNode(),
Series: []idx.Node{series},
})
}

data := models.IndexFindByTag{OrgId: orgId, Expr: expressions, From: from}
resps, err := s.peerQuery(ctx, data, "clusterFindByTag", "/index/find_by_tag", false)
if err != nil {
return nil, err
}

select {
case <-ctx.Done():
//request canceled
return nil, nil
default:
}

for _, r := range resps {
resp := models.IndexFindByTagResp{}
_, err = resp.UnmarshalMsg(r.buf)
Expand Down Expand Up @@ -1003,24 +946,8 @@ func (s *Server) graphiteTags(ctx *middleware.Context, request models.GraphiteTa
}

func (s *Server) clusterTags(ctx context.Context, orgId uint32, filter string, from int64) ([]string, error) {
result, err := s.MetricIndex.Tags(orgId, filter, from)
if err != nil {
return nil, err
}
select {
case <-ctx.Done():
//request canceled
return nil, nil
default:
}

tagSet := make(map[string]struct{}, len(result))
for _, tag := range result {
tagSet[tag] = struct{}{}
}

data := models.IndexTags{OrgId: orgId, Filter: filter, From: from}
resps, err := s.peerQuery(ctx, data, "clusterTags", "/index/tags", false)
resps, err := s.peerQuerySpeculative(ctx, data, "clusterTags", "/index/tags")
if err != nil {
return nil, err
}
Expand All @@ -1032,6 +959,7 @@ func (s *Server) clusterTags(ctx context.Context, orgId uint32, filter string, f
default:
}

tagSet := make(map[string]struct{})
resp := models.IndexTagsResp{}
for _, r := range resps {
_, err = resp.UnmarshalMsg(r.buf)
Expand Down Expand Up @@ -1066,18 +994,10 @@ func (s *Server) graphiteAutoCompleteTags(ctx *middleware.Context, request model
}

func (s *Server) clusterAutoCompleteTags(ctx context.Context, orgId uint32, prefix string, expressions []string, from int64, limit uint) ([]string, error) {
result, err := s.MetricIndex.FindTags(orgId, prefix, expressions, from, limit)
if err != nil {
return nil, err
}

tagSet := make(map[string]struct{}, len(result))
for _, tag := range result {
tagSet[tag] = struct{}{}
}
tagSet := make(map[string]struct{})

data := models.IndexAutoCompleteTags{OrgId: orgId, Prefix: prefix, Expr: expressions, From: from, Limit: limit}
responses, err := s.peerQuery(ctx, data, "clusterAutoCompleteTags", "/index/tags/autoComplete/tags", false)
responses, err := s.peerQuerySpeculative(ctx, data, "clusterAutoCompleteTags", "/index/tags/autoComplete/tags")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1121,18 +1041,10 @@ func (s *Server) graphiteAutoCompleteTagValues(ctx *middleware.Context, request
}

func (s *Server) clusterAutoCompleteTagValues(ctx context.Context, orgId uint32, tag, prefix string, expressions []string, from int64, limit uint) ([]string, error) {
result, err := s.MetricIndex.FindTagValues(orgId, tag, prefix, expressions, from, limit)
if err != nil {
return nil, err
}

valSet := make(map[string]struct{}, len(result))
for _, val := range result {
valSet[val] = struct{}{}
}
valSet := make(map[string]struct{})

data := models.IndexAutoCompleteTagValues{OrgId: orgId, Tag: tag, Prefix: prefix, Expr: expressions, From: from, Limit: limit}
responses, err := s.peerQuery(ctx, data, "clusterAutoCompleteValues", "/index/tags/autoComplete/values", false)
responses, err := s.peerQuerySpeculative(ctx, data, "clusterAutoCompleteValues", "/index/tags/autoComplete/values")
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit a7f011d

Please sign in to comment.