Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Getdata use pointslicepool and getdata return to pool #1924

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,20 @@ func (s *Server) getData(ctx *middleware.Context, request models.GetData) {
if err != nil {
// the only errors returned are from us catching panics, so we should treat them
// all as internalServerErrors

for _, s := range series {
pointSlicePool.PutMaybeNil(s.Datapoints)
}

log.Errorf("HTTP getData() %s", err.Error())
response.Write(ctx, response.WrapError(err))
return
}
response.Write(ctx, response.NewMsgp(200, &models.GetDataRespV1{Stats: ss, Series: series}))

for _, s := range series {
pointSlicePool.PutMaybeNil(s.Datapoints)
}
}

func (s *Server) indexDelete(ctx *middleware.Context, req models.IndexDelete) {
Expand Down
26 changes: 19 additions & 7 deletions api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func divide(pointsA, pointsB []schema.Point) []schema.Point {
}

// getTargets retrieves the series for the given requests by querying local and/or remote nodes as needed
// if an error occurs, it returns all intermediate series to the pool and returns only the error
func (s *Server) getTargets(ctx context.Context, ss *models.StorageStats, reqs []models.Req) ([]models.Series, error) {
// split reqs into local and remote.
localReqs := make([]models.Req, 0)
Expand Down Expand Up @@ -198,17 +199,25 @@ func (s *Server) getTargets(ctx context.Context, ss *models.StorageStats, reqs [
}()

out := make([]models.Series, 0)
var firstErr error
for resp := range responses {
if resp.err != nil {
return nil, resp.err
// even when we run into an error, still collect any series so we can feed them back to the pool
if resp.err != nil && firstErr == nil {
firstErr = resp.err
}
out = append(out, resp.series...)
}
if firstErr != nil {
for _, s := range out {
pointSlicePool.PutMaybeNil(s.Datapoints)
}
return nil, firstErr
}
log.Debugf("DP getTargets: %d series found on cluster", len(out))
return out, nil
}

// getTargetsRemote issues the requests - keyed by node name - on other nodes
// getTargetsRemote issues the requests - keyed by node name - on other nodes and returns corresponding series, along with the first error encountered
func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats, remoteReqs map[string][]models.Req) ([]models.Series, error) {

allPeers, err := cluster.MembersForSpeculativeQuery()
Expand Down Expand Up @@ -269,7 +278,7 @@ func (s *Server) getTargetsRemote(ctx context.Context, ss *models.StorageStats,
return out, err
}

// error is the error of the first failing target request
// getTargetsLocal returns the series corresponding to the given requests, along with the first error encountered
func (s *Server) getTargetsLocal(ctx context.Context, ss *models.StorageStats, reqs []models.Req) ([]models.Series, error) {
log.Debugf("DP getTargetsLocal: handling %d reqs locally", len(reqs))
rCtx, span := tracing.NewSpan(ctx, s.Tracer, "getTargetsLocal")
Expand Down Expand Up @@ -310,14 +319,17 @@ LOOP:
close(responses)
}()
out := make([]models.Series, 0, len(reqs))
var firstErr error
for resp := range responses {
if resp.err != nil {
if resp.err != nil && firstErr == nil {
tags.Error.Set(span, true)
return nil, resp.err
firstErr = resp.err
}
out = append(out, resp.series)
}

if firstErr != nil {
return out, firstErr
}
ss.Trace(span)
log.Debugf("DP getTargetsLocal: %d series found locally", len(out))
return out, nil
Expand Down
2 changes: 2 additions & 0 deletions api/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"github.com/grafana/metrictank/api/models"
"github.com/grafana/metrictank/expr"
"github.com/grafana/metrictank/pointslicepool"
)
Expand All @@ -10,4 +11,5 @@ var pointSlicePool *pointslicepool.PointSlicePool
func init() {
pointSlicePool = pointslicepool.New(pointslicepool.DefaultPointSliceSize)
expr.Pool(pointSlicePool)
models.Pool(pointSlicePool)
}
7 changes: 7 additions & 0 deletions api/models/init_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package models

import "github.com/grafana/metrictank/pointslicepool"

func init() {
pointSlicePool = pointslicepool.New(100)
}
12 changes: 12 additions & 0 deletions api/models/pointslicepool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package models

import (
"github.com/grafana/metrictank/pointslicepool"
)

var pointSlicePool *pointslicepool.PointSlicePool

// Pool tells the models package library which pool to use for temporary []schema.Point
func Pool(p *pointslicepool.PointSlicePool) {
pointSlicePool = p
}
2 changes: 1 addition & 1 deletion api/models/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

//go:generate msgp
//msgp:ignore SeriesMetaPropertiesExport
//msgp:ignore SeriesMetaPropertiesExport Series

type Series struct {
Target string // for fetched data, set from models.Req.Target, i.e. the metric graphite key. for function output, whatever should be shown as target string (legend)
Expand Down
Loading