Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #897 from bloomberg/feature_multiDef
Browse files Browse the repository at this point in the history
Fix multi-interval series issue
  • Loading branch information
Dieterbe authored May 16, 2018
2 parents e797a9f + 81a95cf commit 6e3d03b
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 27 deletions.
2 changes: 1 addition & 1 deletion api/dataprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (s *Server) getTargets(ctx context.Context, reqs []models.Req) ([]models.Se
if len(remoteReqs) > 0 {
wg.Add(1)
go func() {
// all errors returned returned are *response.Error.
// all errors returned are *response.Error.
series, err := s.getTargetsRemote(getCtx, remoteReqs)
if err != nil {
cancel()
Expand Down
19 changes: 7 additions & 12 deletions api/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,6 @@ func (s *Server) graphiteTagFindSeries(ctx *middleware.Context, request models.G
}

func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions []string, from int64) ([]Series, error) {
seriesSet := make(map[string]Series)

result, err := s.MetricIndex.FindByTag(orgId, expressions, from)
if err != nil {
return nil, err
Expand All @@ -938,12 +936,14 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions
default:
}

var allSeries []Series

for _, series := range result {
seriesSet[series.Path] = Series{
allSeries = append(allSeries, Series{
Pattern: series.Path,
Node: cluster.Manager.ThisNode(),
Series: []idx.Node{series},
}
})
}

data := models.IndexFindByTag{OrgId: orgId, Expr: expressions, From: from}
Expand All @@ -966,20 +966,15 @@ func (s *Server) clusterFindByTag(ctx context.Context, orgId uint32, expressions
return nil, err
}
for _, series := range resp.Metrics {
seriesSet[series.Path] = Series{
allSeries = append(allSeries, Series{
Pattern: series.Path,
Node: r.peer,
Series: []idx.Node{series},
}
})
}
}

series := make([]Series, 0, len(seriesSet))
for _, s := range seriesSet {
series = append(series, s)
}

return series, nil
return allSeries, nil
}

func (s *Server) graphiteTags(ctx *middleware.Context, request models.GraphiteTags) {
Expand Down
46 changes: 33 additions & 13 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,10 @@ func (m *MemoryIdx) Update(point schema.MetricPoint, partition int32) (idx.Archi
if LogLevel < 2 {
log.Debug("metricDef with id %v already in index", point.MKey)
}
existing.LastUpdate = int64(point.Time)

if existing.LastUpdate < int64(point.Time) {
existing.LastUpdate = int64(point.Time)
}
existing.Partition = partition
statUpdate.Inc()
statUpdateDuration.Value(time.Since(pre))
Expand All @@ -244,7 +247,9 @@ func (m *MemoryIdx) AddOrUpdate(mkey schema.MKey, data *schema.MetricData, parti
if ok {
oldPart := existing.Partition
log.Debug("metricDef with id %s already in index.", mkey)
existing.LastUpdate = data.Time
if existing.LastUpdate < int64(data.Time) {
existing.LastUpdate = int64(data.Time)
}
existing.Partition = partition
statUpdate.Inc()
statUpdateDuration.Value(time.Since(pre))
Expand Down Expand Up @@ -831,8 +836,9 @@ func (m *MemoryIdx) FindByTag(orgId uint32, expressions []string, from int64) ([
m.RLock()
defer m.RUnlock()

// construct the output slice of idx.Node's such that there is only 1 idx.Node for each path
ids := m.idsByTagQuery(orgId, query)
res := make([]idx.Node, 0, len(ids))
byPath := make(map[string]*idx.Node)
for id := range ids {
def, ok := m.defById[id]
if !ok {
Expand All @@ -841,14 +847,25 @@ func (m *MemoryIdx) FindByTag(orgId uint32, expressions []string, from int64) ([
continue
}

res = append(res, idx.Node{
Path: def.NameWithTags(),
Leaf: true,
HasChildren: false,
Defs: []idx.Archive{*def},
})
if existing, ok := byPath[def.NameWithTags()]; !ok {
byPath[def.NameWithTags()] = &idx.Node{
Path: def.NameWithTags(),
Leaf: true,
HasChildren: false,
Defs: []idx.Archive{*def},
}
} else {
existing.Defs = append(existing.Defs, *def)
}
}

results := make([]idx.Node, 0, len(byPath))

for _, v := range byPath {
results = append(results, *v)
}
return res, nil

return results, nil
}

func (m *MemoryIdx) idsByTagQuery(orgId uint32, query TagQuery) IdSet {
Expand Down Expand Up @@ -877,11 +894,13 @@ func (m *MemoryIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node,
}
log.Debug("memory-idx: %d nodes matching pattern %s found", len(matchedNodes), pattern)
results := make([]idx.Node, 0)
seen := make(map[string]struct{})
byPath := make(map[string]struct{})
// construct the output slice of idx.Node's such that there is only 1 idx.Node
// for each path, and it holds all defs that the Node refers too.
// if there are public (orgId OrgIdPublic) and private leaf nodes with the same series
// path, then the public metricDefs will be excluded.
for _, n := range matchedNodes {
if _, ok := seen[n.Path]; !ok {
if _, ok := byPath[n.Path]; !ok {
idxNode := idx.Node{
Path: n.Path,
Leaf: n.Leaf(),
Expand All @@ -904,7 +923,7 @@ func (m *MemoryIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node,
}
}
results = append(results, idxNode)
seen[n.Path] = struct{}{}
byPath[n.Path] = struct{}{}
} else {
log.Debug("memory-idx: path %s already seen", n.Path)
}
Expand All @@ -914,6 +933,7 @@ func (m *MemoryIdx) Find(orgId uint32, pattern string, from int64) ([]idx.Node,
return results, nil
}

// find returns all Nodes matching the pattern for the given orgId
func (m *MemoryIdx) find(orgId uint32, pattern string) ([]*Node, error) {
tree, ok := m.tree[orgId]
if !ok {
Expand Down
21 changes: 20 additions & 1 deletion idx/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ func testGetAddKey(t *testing.T) {
So(defs, ShouldHaveLength, 15)
})
})

if TagSupport {
Convey("When adding metricDefs with the same series name as existing metricDefs (tagged)", t, func() {
Convey("then findByTag", func() {
nodes, err := ix.FindByTag(1, []string{"name!="}, 0)
So(err, ShouldBeNil)
defs := make([]idx.Archive, 0, len(nodes))
for i := range nodes {
defs = append(defs, nodes[i].Defs...)
}
So(defs, ShouldHaveLength, 2*len(org1Series))
})
})
}
}

func TestFind(t *testing.T) {
Expand Down Expand Up @@ -673,7 +687,12 @@ func TestPruneTaggedSeriesWithCollidingTagSets(t *testing.T) {
Convey("After purge", t, func() {
nodes, err := ix.FindByTag(1, findExpressions, 0)
So(err, ShouldBeNil)
So(nodes, ShouldHaveLength, 2)
So(nodes, ShouldHaveLength, 1)
defs := make([]idx.Archive, 0, len(nodes))
for i := range nodes {
defs = append(defs, nodes[i].Defs...)
}
So(defs, ShouldHaveLength, 2)
})

Convey("When purging newer series", t, func() {
Expand Down

0 comments on commit 6e3d03b

Please sign in to comment.