Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
make "public org" explicit + optimize find in case orgIdPublic passed
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Mar 30, 2018
1 parent a7ebc62 commit 14d0406
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 24 deletions.
2 changes: 1 addition & 1 deletion idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func (c *CasIdx) prune() {
for range ticker.C {
log.Debug("cassandra-idx: pruning items from index that have not been seen for %s", maxStale.String())
staleTs := time.Now().Add(maxStale * -1)
_, err := c.Prune(-1, staleTs)
_, err := c.Prune(idx.OrgIdPublic, staleTs)
if err != nil {
log.Error(3, "cassandra-idx: prune error. %s", err)
}
Expand Down
6 changes: 3 additions & 3 deletions idx/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestGetAddKey(t *testing.T) {
ix := New()
initForTests(ix)

publicSeries := getMetricData(-1, 2, 5, 10, "metric.public")
publicSeries := getMetricData(idx.OrgIdPublic, 2, 5, 10, "metric.public")
org1Series := getMetricData(1, 2, 5, 10, "metric.org1")
org2Series := getMetricData(2, 2, 5, 10, "metric.org2")

Expand All @@ -138,7 +138,7 @@ func TestGetAddKey(t *testing.T) {
Convey(fmt.Sprintf("Then listing metrics for OrgId %d", orgId), func() {
defs := ix.List(orgId)
numSeries := len(series)
if orgId != -1 {
if orgId != idx.OrgIdPublic {
numSeries += 5
}
So(defs, ShouldHaveLength, numSeries)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestAddToWriteQueue(t *testing.T) {
func TestFind(t *testing.T) {
ix := New()
initForTests(ix)
for _, s := range getMetricData(-1, 2, 5, 10, "metric.demo") {
for _, s := range getMetricData(idx.OrgIdPublic, 2, 5, 10, "metric.demo") {
ix.AddOrUpdate(s, 1)
}
for _, s := range getMetricData(1, 2, 5, 10, "metric.demo") {
Expand Down
8 changes: 5 additions & 3 deletions idx/idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
schema "gopkg.in/raintank/schema.v1"
)

const OrgIdPublic = -1

var (
BothBranchAndLeaf = errors.New("node can't be both branch and leaf")
BranchUnderLeaf = errors.New("can't add branch under leaf")
Expand Down Expand Up @@ -118,18 +120,18 @@ type MetricIndex interface {

// Find searches the index. The method is passed an OrgId, a query
// pattern and a unix timestamp. Searches should return all nodes that match for
// the given OrgId and OrgId -1. The pattern should be handled in the same way
// the given OrgId and OrgIdPublic. The pattern should be handled in the same way
// Graphite would. see https://graphite.readthedocs.io/en/latest/render_api.html#paths-and-wildcards
// And the unix stimestamp is used to ignore series that have been stale since
// the timestamp.
Find(int, string, int64) ([]Node, error)

// List returns all Archives for the passed OrgId, or for all organisations if -1 is provided.
// List returns all Archives for the passed OrgId, or for all organisations if OrgIdPublic
List(int) []Archive

// Prune deletes all metrics from the index for the passed org where
// the last time the metric was seen is older then the passed timestamp. If the org
// passed is -1, then the all orgs should be examined for stale metrics to be deleted.
// is OrgIdPublic, then the all orgs should be examined for stale metrics to be deleted.
// It returns all Archives deleted and any error encountered.
Prune(int, time.Time) ([]Archive, error)

Expand Down
18 changes: 10 additions & 8 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,15 +836,17 @@ func (m *MemoryIdx) Find(orgId int, pattern string, from int64) ([]idx.Node, err
if err != nil {
return nil, err
}
publicNodes, err := m.find(-1, pattern)
if err != nil {
return nil, err
if orgId != idx.OrgIdPublic {
publicNodes, err := m.find(idx.OrgIdPublic, pattern)
if err != nil {
return nil, err
}
}
matchedNodes = append(matchedNodes, publicNodes...)
log.Debug("memory-idx: %d nodes matching pattern %s found", len(matchedNodes), pattern)
results := make([]idx.Node, 0)
seen := make(map[string]struct{})
// if there are public (orgId -1) and private leaf nodes with the same series
// if there are public (orgId OrgIdPublic) and private leaf nodes with the same series
// path, then the public metricDefs will be excluded.
for _, n := range matchedNodes {
if _, ok := seen[n.Path]; !ok {
Expand Down Expand Up @@ -978,7 +980,7 @@ func (m *MemoryIdx) List(orgId int) []idx.Archive {
defer m.RUnlock()

orgs := make(map[int]struct{})
if orgId == -1 {
if orgId == idx.OrgIdPublic {
log.Info("memory-idx: returning all metricDefs for all orgs")
for org := range m.tree {
orgs[org] = struct{}{}
Expand All @@ -987,7 +989,7 @@ func (m *MemoryIdx) List(orgId int) []idx.Archive {
orgs[org] = struct{}{}
}
} else {
orgs[-1] = struct{}{}
orgs[idx.OrgIdPublic] = struct{}{}
orgs[orgId] = struct{}{}
}

Expand Down Expand Up @@ -1190,7 +1192,7 @@ func (m *MemoryIdx) delete(orgId int, n *Node, deleteEmptyParents, deleteChildre
func (m *MemoryIdx) Prune(orgId int, oldest time.Time) ([]idx.Archive, error) {
oldestUnix := oldest.Unix()
orgs := make(map[int]struct{})
if orgId == -1 {
if orgId == idx.OrgIdPublic {
log.Info("memory-idx: pruning stale metricDefs across all orgs")
m.RLock()
for org := range m.tree {
Expand Down Expand Up @@ -1300,7 +1302,7 @@ DEFS:

statMetricsActive.Add(-1 * len(pruned))

if orgId == -1 {
if orgId == idx.OrgIdPublic {
log.Info("memory-idx: pruning stale metricDefs from memory for all orgs took %s", time.Since(pre).String())
}

Expand Down
18 changes: 9 additions & 9 deletions idx/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func testGetAddKey(t *testing.T) {
ix := New()
ix.Init()

publicSeries := getMetricData(-1, 2, 5, 10, "metric.public", false)
publicSeries := getMetricData(idx.OrgIdPublic, 2, 5, 10, "metric.public", false)
org1Series := getMetricData(1, 2, 5, 10, "metric.org1", false)
org2Series := getMetricData(2, 2, 5, 10, "metric.org2", false)

Expand All @@ -99,7 +99,7 @@ func testGetAddKey(t *testing.T) {
Convey(fmt.Sprintf("Then listing metrics for OrgId %d", orgId), func() {
defs := ix.List(orgId)
numSeries := len(series)
if orgId != -1 {
if orgId != idx.OrgIdPublic {
numSeries += 5
}
So(defs, ShouldHaveLength, numSeries)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestFind(t *testing.T) {
func testFind(t *testing.T) {
ix := New()
ix.Init()
for _, s := range getMetricData(-1, 2, 5, 10, "metric.demo", false) {
for _, s := range getMetricData(idx.OrgIdPublic, 2, 5, 10, "metric.demo", false) {
s.Time = 10 * 86400
ix.AddOrUpdate(s, 1)
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func testDelete(t *testing.T) {
ix := New()
ix.Init()

publicSeries := getMetricData(-1, 2, 5, 10, "metric.public", false)
publicSeries := getMetricData(idx.OrgIdPublic, 2, 5, 10, "metric.public", false)
org1Series := getMetricData(1, 2, 5, 10, "metric.org1", false)

for _, s := range publicSeries {
Expand All @@ -271,7 +271,7 @@ func TestDeleteTagged(t *testing.T) {
ix := New()
ix.Init()

publicSeries := getMetricData(-1, 2, 5, 10, "metric.public", true)
publicSeries := getMetricData(idx.OrgIdPublic, 2, 5, 10, "metric.public", true)
org1Series := getMetricData(1, 2, 5, 10, "metric.org1", true)

for _, s := range publicSeries {
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestPruneTaggedSeries(t *testing.T) {
}

Convey("after populating index", t, func() {
defs := ix.List(-1)
defs := ix.List(idx.OrgIdPublic)
So(defs, ShouldHaveLength, 10)
})

Expand All @@ -523,7 +523,7 @@ func TestPruneTaggedSeries(t *testing.T) {
})

Convey("after purge", t, func() {
defs := ix.List(-1)
defs := ix.List(idx.OrgIdPublic)
So(defs, ShouldHaveLength, 5)
data := &schema.MetricData{
Name: defs[0].Name,
Expand Down Expand Up @@ -638,7 +638,7 @@ func testPrune(t *testing.T) {
ix.AddOrUpdate(d, 1)
}
Convey("after populating index", t, func() {
defs := ix.List(-1)
defs := ix.List(idx.OrgIdPublic)
So(defs, ShouldHaveLength, 10)
})
Convey("When purging old series", t, func() {
Expand All @@ -654,7 +654,7 @@ func testPrune(t *testing.T) {

})
Convey("after purge", t, func() {
defs := ix.List(-1)
defs := ix.List(idx.OrgIdPublic)
So(defs, ShouldHaveLength, 5)
data := &schema.MetricData{
Name: defs[0].Name,
Expand Down

0 comments on commit 14d0406

Please sign in to comment.