Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Batch update meta records #1442

Merged
merged 11 commits into from
Aug 30, 2019
25 changes: 25 additions & 0 deletions api/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,28 @@ func (s *Server) indexMetaTagRecordUpsert(ctx *middleware.Context, req models.In
Created: created,
}))
}

func (s *Server) indexMetaTagRecordSwap(ctx *middleware.Context, req models.IndexMetaTagRecordSwap) {
if s.MetricIndex == nil {
response.Write(ctx, response.NewMsgp(200, &models.MetaTagRecordSwapResult{}))
return
}

metaTagRecords := make([]tagquery.MetaTagRecord, len(req.Records))
for i, rawRecord := range req.Records {
var err error
metaTagRecords[i], err = tagquery.ParseMetaTagRecord(rawRecord.MetaTags, rawRecord.Expressions)
if err != nil {
response.Write(ctx, response.WrapError(fmt.Errorf("Error when parsing record %d: %s", i, err)))
return
}
}

added, deleted, err := s.MetricIndex.MetaTagRecordSwap(req.OrgId, metaTagRecords)
if err != nil {
response.Write(ctx, response.WrapError(fmt.Errorf("Error when swapping meta tag records: %s", err)))
return
}

response.Write(ctx, response.NewMsgp(200, &models.MetaTagRecordSwapResult{Added: added, Deleted: deleted}))
}
73 changes: 73 additions & 0 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,3 +1421,76 @@ func (s *Server) metaTagRecordUpsert(ctx *middleware.Context, upsertRequest mode
response.Write(ctx, response.NewJson(200, res, ""))
}
}

func (s *Server) metaTagRecordSwap(ctx *middleware.Context, swapRequest models.MetaTagRecordSwap) {
metaTagRecords := make([]tagquery.MetaTagRecord, len(swapRequest.Records))
for i, rawRecord := range swapRequest.Records {
var err error
metaTagRecords[i], err = tagquery.ParseMetaTagRecord(rawRecord.MetaTags, rawRecord.Expressions)
if err != nil {
response.Write(ctx, response.WrapError(fmt.Errorf("Error when parsing record %d: %s", i, err)))
return
}
}

var added, deleted uint32
if s.MetricIndex != nil {
var err error
added, deleted, err = s.MetricIndex.MetaTagRecordSwap(ctx.OrgId, metaTagRecords)
if err != nil {
response.Write(ctx, response.WrapError(err))
return
}

if !swapRequest.Propagate {
response.Write(ctx, response.NewJson(200, models.MetaTagRecordSwapResult{
Added: added,
Deleted: deleted,
}, ""))
return
}
} else if !swapRequest.Propagate {
response.Write(ctx, response.NewJson(200, models.MetaTagRecordSwapResult{}, ""))
return
}

res := models.MetaTagRecordSwapResultByNode{
Local: models.MetaTagRecordSwapResult{
Added: added,
Deleted: deleted,
},
}

indexSwapRequest := models.IndexMetaTagRecordSwap{
OrgId: ctx.OrgId,
Records: swapRequest.Records,
}

results, errors := s.peerQuery(ctx.Req.Context(), indexSwapRequest, "metaTagRecordSwap", "/index/metaTags/swap")

if len(errors) > 0 {
res.PeerErrors = make(map[string]string, len(errors))
for peer, err := range errors {
res.PeerErrors[peer] = err.Error()
}
}

if len(results) > 0 {
res.PeerResults = make(map[string]models.MetaTagRecordSwapResult, len(results))
for peer, resp := range results {
peerResp := models.MetaTagRecordSwapResult{}
_, err := peerResp.UnmarshalMsg(resp.buf)
if err != nil {
res.PeerErrors[peer] = fmt.Sprintf("Error when unmarshaling response: %s", err.Error())
continue
}
res.PeerResults[peer] = peerResp
}
}

if len(errors) > 0 {
response.Write(ctx, response.NewJson(500, res, ""))
} else {
response.Write(ctx, response.NewJson(200, res, ""))
}
}
49 changes: 48 additions & 1 deletion api/models/meta_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
traceLog "github.com/opentracing/opentracing-go/log"
)

//go:generate msgp

type MetaTagRecordUpsert struct {
MetaTags []string `json:"metaTags" binding:"Required"`
Expressions []string `json:"expressions" binding:"Required"`
Expand All @@ -30,7 +32,6 @@ type MetaTagRecordUpsertResultByNode struct {
PeerErrors map[string]string `json:"peerErrors"`
}

//go:generate msgp
type MetaTagRecordUpsertResult struct {
MetaTags []string `json:"metaTags"`
Queries []string `json:"queries"`
Expand All @@ -53,3 +54,49 @@ func (m IndexMetaTagRecordUpsert) Trace(span opentracing.Span) {

func (m IndexMetaTagRecordUpsert) TraceDebug(span opentracing.Span) {
}

type MetaTagRecordSwap struct {
Records []MetaTagRecord `json:"records"`
Propagate bool `json:"propagate"`
}

type MetaTagRecord struct {
MetaTags []string `json:"metaTags" binding:"Required"`
Expressions []string `json:"expressions" binding:"Required"`
}

func (m MetaTagRecordSwap) Trace(span opentracing.Span) {
span.LogFields(
traceLog.Uint32("recordCount", uint32(len(m.Records))),
traceLog.Bool("propagate", m.Propagate),
)
}

func (m MetaTagRecordSwap) TraceDebug(span opentracing.Span) {
}

type MetaTagRecordSwapResultByNode struct {
Local MetaTagRecordSwapResult `json:"local"`
PeerResults map[string]MetaTagRecordSwapResult `json:"peerResults"`
PeerErrors map[string]string `json:"peerErrors"`
}

type MetaTagRecordSwapResult struct {
Deleted uint32 `json:"deleted"`
Added uint32 `json:"added"`
}

type IndexMetaTagRecordSwap struct {
OrgId uint32 `json:"orgId" binding:"Required"`
Records []MetaTagRecord `json:"records"`
}

func (m IndexMetaTagRecordSwap) Trace(span opentracing.Span) {
span.SetTag("orgId", m.OrgId)
span.LogFields(
traceLog.Uint32("recordCount", uint32(len(m.Records))),
)
}

func (m IndexMetaTagRecordSwap) TraceDebug(span opentracing.Span) {
}
Loading