Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1295 from grafana/issue-1293
Browse files Browse the repository at this point in the history
don't issue requests that don't apply to query nodes, and if they do receive them, don't crash
  • Loading branch information
Dieterbe authored Apr 26, 2019
2 parents 4b1e436 + e0d0a4c commit dd9b92d
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 62 deletions.
8 changes: 7 additions & 1 deletion api/ccache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ func (s *Server) ccacheDelete(ctx *middleware.Context, req models.CCacheDelete)
}
}

// nothing to do on query nodes. they have no index or chunk cache
if s.MetricIndex == nil {
response.Write(ctx, response.NewJson(code, res, ""))
return
}

fullFlush := false
for _, pattern := range req.Patterns {
if pattern == "**" {
Expand Down Expand Up @@ -85,7 +91,7 @@ func (s *Server) ccacheDeletePropagate(ctx context.Context, req *models.CCacheDe
// we never want to propagate more than once to avoid loops
req.Propagate = false

peers := cluster.Manager.MemberList()
peers := cluster.Manager.MemberList(false, true)
peerResults := make(map[string]models.CCacheDeleteResp)
var mu sync.Mutex
var wg sync.WaitGroup
Expand Down
4 changes: 2 additions & 2 deletions api/ccache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestMetricDeleteWithErrorInPropagation(t *testing.T) {

respEncoded := response.NewJson(500, resp, "")
buf, _ := respEncoded.Body()
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, "0", buf))
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, "0", []int32{0}, buf))

// define how many series/archives are going to get deleted by this server
delSeries := 1
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestMetricDeletePropagation(t *testing.T) {
expectedDeletedArchives += resp.DeletedArchives
respEncoded := response.NewJson(200, resp, "")
buf, _ := respEncoded.Body()
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, peer, buf))
manager.Peers = append(manager.Peers, cluster.NewMockNode(false, peer, []int32{0}, buf))
}

// define how many series/archives are going to get deleted by this server
Expand Down
95 changes: 78 additions & 17 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *Server) getClusterStatus(ctx *middleware.Context) {
status := models.ClusterStatus{
ClusterName: cluster.ClusterName,
NodeName: cluster.Manager.ThisNode().GetName(),
Members: cluster.Manager.MemberList(),
Members: cluster.Manager.MemberList(false, false),
}
response.Write(ctx, response.NewJson(200, status, ""))
}
Expand All @@ -79,7 +79,7 @@ func (s *Server) postClusterMembers(ctx *middleware.Context, req models.ClusterM
memberNames := make(map[string]struct{})
var toJoin []string

for _, memberNode := range cluster.Manager.MemberList() {
for _, memberNode := range cluster.Manager.MemberList(false, false) {
memberNames[memberNode.GetName()] = struct{}{}
}

Expand Down Expand Up @@ -114,6 +114,12 @@ func (s *Server) postClusterMembers(ctx *middleware.Context, req models.ClusterM
func (s *Server) indexFind(ctx *middleware.Context, req models.IndexFind) {
resp := models.NewIndexFindResp()

// query nodes don't own any data
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, resp))
return
}

for _, pattern := range req.Patterns {
nodes, err := s.MetricIndex.Find(req.OrgId, pattern, req.From)
if err != nil {
Expand All @@ -126,6 +132,13 @@ func (s *Server) indexFind(ctx *middleware.Context, req models.IndexFind) {
}

func (s *Server) indexTagDetails(ctx *middleware.Context, req models.IndexTagDetails) {

// query nodes don't own any data
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, &models.IndexTagDetailsResp{}))
return
}

values, err := s.MetricIndex.TagDetails(req.OrgId, req.Tag, req.Filter, req.From)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
Expand All @@ -135,6 +148,13 @@ func (s *Server) indexTagDetails(ctx *middleware.Context, req models.IndexTagDet
}

func (s *Server) indexTags(ctx *middleware.Context, req models.IndexTags) {

// query nodes don't own any data
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, &models.IndexTagsResp{}))
return
}

tags, err := s.MetricIndex.Tags(req.OrgId, req.Filter, req.From)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
Expand All @@ -144,6 +164,13 @@ func (s *Server) indexTags(ctx *middleware.Context, req models.IndexTags) {
}

func (s *Server) indexAutoCompleteTags(ctx *middleware.Context, req models.IndexAutoCompleteTags) {

// query nodes don't own any data
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, models.StringList(nil)))
return
}

tags, err := s.MetricIndex.FindTags(req.OrgId, req.Prefix, req.Expr, req.From, req.Limit)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
Expand All @@ -153,6 +180,13 @@ func (s *Server) indexAutoCompleteTags(ctx *middleware.Context, req models.Index
}

func (s *Server) indexAutoCompleteTagValues(ctx *middleware.Context, req models.IndexAutoCompleteTagValues) {

// query nodes don't own any data
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, models.StringList(nil)))
return
}

tags, err := s.MetricIndex.FindTagValues(req.OrgId, req.Tag, req.Prefix, req.Expr, req.From, req.Limit)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
Expand All @@ -162,19 +196,34 @@ func (s *Server) indexAutoCompleteTagValues(ctx *middleware.Context, req models.
}

func (s *Server) indexTagDelSeries(ctx *middleware.Context, request models.IndexTagDelSeries) {

res := models.IndexTagDelSeriesResp{}

// nothing to do on query nodes.
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, res))
return
}

deleted, err := s.MetricIndex.DeleteTagged(request.OrgId, request.Paths)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
}

res := models.IndexTagDelSeriesResp{}
res.Count = len(deleted)

response.Write(ctx, response.NewMsgp(200, res))
}

func (s *Server) indexFindByTag(ctx *middleware.Context, req models.IndexFindByTag) {

// query nodes don't own any data.
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, &models.IndexFindByTagResp{}))
return
}

metrics, err := s.MetricIndex.FindByTag(req.OrgId, req.Expr, req.From)
if err != nil {
response.Write(ctx, response.NewError(http.StatusBadRequest, err.Error()))
Expand All @@ -185,6 +234,13 @@ func (s *Server) indexFindByTag(ctx *middleware.Context, req models.IndexFindByT

// IndexGet returns a msgp encoded schema.MetricDefinition
func (s *Server) indexGet(ctx *middleware.Context, req models.IndexGet) {

// query nodes don't own any data.
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(404, nil))
return
}

def, ok := s.MetricIndex.Get(req.MKey)
if !ok {
response.Write(ctx, response.NewError(http.StatusNotFound, "Not Found"))
Expand All @@ -196,6 +252,13 @@ func (s *Server) indexGet(ctx *middleware.Context, req models.IndexGet) {

// IndexList returns msgp encoded schema.MetricDefinition's
func (s *Server) indexList(ctx *middleware.Context, req models.IndexList) {

// query nodes don't own any data.
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgpArray(200, nil))
return
}

defs := s.MetricIndex.List(req.OrgId)
resp := make([]msgp.Marshaler, len(defs))
for i := range defs {
Expand All @@ -218,6 +281,12 @@ func (s *Server) getData(ctx *middleware.Context, request models.GetData) {
}

func (s *Server) indexDelete(ctx *middleware.Context, req models.IndexDelete) {

// nothing to do on query nodes.
if s.MetricIndex == nil {
return
}

defs, err := s.MetricIndex.Delete(req.OrgId, req.Query)
if err != nil {
// errors can only be caused by bad request.
Expand All @@ -237,25 +306,17 @@ type PeerResponse struct {
}

// peerQuery takes a request and the path to request it on, then fans it out
// across the cluster, except to the local peer. If any peer fails requests to
// across the cluster, except to the local peer or peers without data.
// Note: unlike the other peerQuery methods, we include peers that are not ready
// If any peer fails requests to
// other peers are aborted.
// ctx: request context
// data: request to be submitted
// name: name to be used in logging & tracing
// path: path to request on
func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, path string, allPeers bool) (map[string]PeerResponse, error) {
var peers []cluster.Node
var err error

if allPeers {
peers = cluster.Manager.MemberList()
} else {
peers, err = cluster.MembersForQuery()
if err != nil {
log.Errorf("HTTP peerQuery unable to get peers, %s", err.Error())
return nil, err
}
}
func (s *Server) peerQuery(ctx context.Context, data cluster.Traceable, name, path string) (map[string]PeerResponse, error) {

peers := cluster.Manager.MemberList(false, true)
log.Debugf("HTTP %s across %d instances", name, len(peers)-1)

reqCtx, cancel := context.WithCancel(ctx)
Expand Down
12 changes: 12 additions & 0 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@ func (s *Server) itersToPoints(ctx *requestContext, iters []tsz.Iter) []schema.P
func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error) {
_, span := tracing.NewSpan(ctx.ctx, s.Tracer, "getSeriesAggMetrics")
defer span.Finish()

// this is a query node that for some reason received a request
if s.MemoryStore == nil {
return mdata.Result{}, nil
}

metric, ok := s.MemoryStore.Get(ctx.AMKey.MKey)
if !ok {
return mdata.Result{
Expand All @@ -480,6 +486,12 @@ func (s *Server) getSeriesAggMetrics(ctx *requestContext) (mdata.Result, error)

// will only fetch until until, but uses ctx.To for debug logging
func (s *Server) getSeriesCachedStore(ctx *requestContext, until uint32) ([]tsz.Iter, error) {

// this is a query node that for some reason received a data request
if s.BackendStore == nil {
return nil, nil
}

var iters []tsz.Iter
var prevts uint32

Expand Down
32 changes: 24 additions & 8 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ func (s *Server) metricsFind(ctx *middleware.Context, request models.GraphiteFin
}

func (s *Server) listLocal(orgId uint32) []idx.Archive {

// query nodes have no data
if s.MetricIndex == nil {
return nil
}

return s.MetricIndex.List(orgId)
}

Expand Down Expand Up @@ -507,7 +513,7 @@ func findTreejson(query string, nodes []idx.Node) models.SeriesTree {
}

func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete) {
peers := cluster.Manager.MemberList()
peers := cluster.Manager.MemberList(false, true)
peers = append(peers, cluster.Manager.ThisNode())
log.Debugf("HTTP metricsDelete for %v across %d instances", req.Query, len(peers))

Expand Down Expand Up @@ -586,6 +592,12 @@ func (s *Server) metricsDelete(ctx *middleware.Context, req models.MetricsDelete
}

func (s *Server) metricsDeleteLocal(orgId uint32, query string) (int, error) {

// nothing to do on query nodes.
if s.MetricIndex == nil {
return 0, nil
}

defs, err := s.MetricIndex.Delete(orgId, query)
return len(defs), err
}
Expand Down Expand Up @@ -1084,22 +1096,26 @@ func (s *Server) graphiteFunctions(ctx *middleware.Context) {
}

func (s *Server) graphiteTagDelSeries(ctx *middleware.Context, request models.GraphiteTagDelSeries) {
deleted, err := s.MetricIndex.DeleteTagged(ctx.OrgId, request.Paths)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
}

res := models.GraphiteTagDelSeriesResp{}
res.Count = len(deleted)

// nothing to do on query nodes.
if s.MetricIndex != nil {
deleted, err := s.MetricIndex.DeleteTagged(ctx.OrgId, request.Paths)
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
}
res.Count = len(deleted)
}

if !request.Propagate {
response.Write(ctx, response.NewJson(200, res, ""))
return
}

data := models.IndexTagDelSeries{OrgId: ctx.OrgId, Paths: request.Paths}
responses, err := s.peerQuery(ctx.Req.Context(), data, "clusterTagDelSeries,", "/index/tags/delSeries", true)
responses, err := s.peerQuery(ctx.Req.Context(), data, "clusterTagDelSeries,", "/index/tags/delSeries")
if err != nil {
response.Write(ctx, response.WrapErrorForTagDB(err))
return
Expand Down
12 changes: 3 additions & 9 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func MembersForQuery() ([]Node, error) {
}
}

for _, member := range Manager.MemberList() {
if !member.IsReady() || member.GetName() == thisNode.GetName() {
for _, member := range Manager.MemberList(true, true) {
if member.GetName() == thisNode.GetName() {
continue
}
for _, part := range member.GetPartitions() {
Expand Down Expand Up @@ -159,7 +159,7 @@ LOOP:
// keyed by the first (lowest) partition of their shard group
func MembersForSpeculativeQuery() (map[int32][]Node, error) {
thisNode := Manager.ThisNode()
allNodes := Manager.MemberList()
allNodes := Manager.MemberList(true, true)
membersMap := make(map[int32][]Node)

// If we are running in dev mode, just return thisNode
Expand All @@ -172,13 +172,7 @@ func MembersForSpeculativeQuery() (map[int32][]Node, error) {

// store the available nodes for each partition group
for _, member := range allNodes {
if !member.IsReady() {
continue
}
partitions := member.GetPartitions()
if len(partitions) == 0 {
continue
}
memberStartPartition := partitions[0]

if _, ok := membersMap[memberStartPartition]; !ok {
Expand Down
1 change: 1 addition & 0 deletions cluster/if.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Node interface {
IsReady() bool
GetPartitions() []int32
GetPriority() int
HasData() bool
Post(context.Context, string, string, Traceable) ([]byte, error)
GetName() string
}
Loading

0 comments on commit dd9b92d

Please sign in to comment.