Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
implement a limit on index.Find
Browse files Browse the repository at this point in the history
note that we also now cache limit breaches in the findcache
  • Loading branch information
Dieterbe committed Oct 16, 2020
1 parent 85821e7 commit b9c3b64
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (t *TestRun) runQuery(pattern string, wg *sync.WaitGroup, active chan struc
}()
pre := time.Now()
active <- struct{}{}
_, err := t.index.Find(orgID, pattern, 0)
_, err := t.index.Find(orgID, pattern, 0, 0)
if err != nil {
log.Printf("Warning: Query failed with error: %s", err)
}
Expand Down
28 changes: 21 additions & 7 deletions idx/memory/find_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type FindCache struct {
sync.RWMutex
}

type CacheResult struct {
nodes []*Node
err error
}

func NewFindCache(size, invalidateQueueSize, invalidateMaxSize int, invalidateMaxWait, backoffTime time.Duration) *FindCache {
fc := &FindCache{
size: size,
Expand All @@ -80,28 +85,34 @@ func NewFindCache(size, invalidateQueueSize, invalidateMaxSize int, invalidateMa
return fc
}

func (c *FindCache) Get(orgId uint32, pattern string) ([]*Node, bool) {
func (c *FindCache) Get(orgId uint32, pattern string) (CacheResult, bool) {
c.RLock()
cache, ok := c.cache[orgId]
c.RUnlock()
if !ok {
findCacheMiss.Inc()
return nil, ok
return CacheResult{}, ok
}
nodes, ok := cache.Get(pattern)
if !ok {
findCacheMiss.Inc()
return nil, ok
return CacheResult{}, ok
}
findCacheHit.Inc()
return nodes.([]*Node), ok
return nodes.(CacheResult), ok
}

func (c *FindCache) Add(orgId uint32, pattern string, nodes []*Node) {
func (c *FindCache) Add(orgId uint32, pattern string, nodes []*Node, e error) {
c.RLock()
cache, ok := c.cache[orgId]
backoff := c.backoff
c.RUnlock()

cr := CacheResult{
nodes: nodes,
err: e,
}

var err error
if !ok {
// don't init the cache if we are in backoff mode.
Expand All @@ -122,7 +133,7 @@ func (c *FindCache) Add(orgId uint32, pattern string, nodes []*Node) {
}
c.Unlock()
}
cache.Add(pattern, nodes)
cache.Add(pattern, cr)
}

// Purge clears the cache for the specified orgId
Expand Down Expand Up @@ -157,6 +168,9 @@ func (c *FindCache) PurgeAll() {
// goroutines processing the invalidations, we purge the cache and
// disable it for `backoffTime`. Future InvalidateFor calls made during
// the backoff time will then return immediately.
// Note: if we have a cached entry for a find that resulted in "too many series requested"
// we shouldn't have to clear that when entries are added to the cache
// (only when entries are removed). but that's an optimization for later.
func (c *FindCache) InvalidateFor(orgId uint32, path string) {
c.Lock()
findCacheInvalidationsReceived.Inc()
Expand Down Expand Up @@ -275,7 +289,7 @@ func (c *FindCache) processInvalidateQueue() {
}

for _, k := range cache.Keys() {
matches, err := find((*Tree)(tree), k.(string))
matches, err := find((*Tree)(tree), k.(string), 0)
if err != nil {
log.Errorf("memory-idx: checking if cache key %q matches any of the %d invalid patterns resulted in error: %s", k, len(reqs), err)
}
Expand Down
39 changes: 27 additions & 12 deletions idx/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,26 +1270,38 @@ func (m *UnpartitionedMemoryIdx) finalizeResult(res []string, limit uint, dedupl
return res
}

func (m *UnpartitionedMemoryIdx) findMaybeCached(tree *Tree, orgId uint32, pattern string) ([]*Node, error) {
func (m *UnpartitionedMemoryIdx) findMaybeCached(tree *Tree, orgId uint32, pattern string, limit int64) ([]*Node, error) {

if m.findCache == nil {
return find(tree, pattern)
return find(tree, pattern, limit)
}

matchedNodes, ok := m.findCache.Get(orgId, pattern)
cr, ok := m.findCache.Get(orgId, pattern)
if ok {
return matchedNodes, nil
return cr.nodes, cr.err
}

matchedNodes, err := find(tree, pattern)
matchedNodes, err := find(tree, pattern, limit)

// we want to cache bad requests from the user
if err != nil {
return nil, err
_, ok := err.(errors.BadRequest)
if !ok {
return nil, err
}
}
m.findCache.Add(orgId, pattern, matchedNodes)
return matchedNodes, nil
m.findCache.Add(orgId, pattern, matchedNodes, err)
return matchedNodes, err
}

func (m *UnpartitionedMemoryIdx) Find(orgId uint32, pattern string, from, limit int64) ([]idx.Node, error) {

// note that we apply the limit on the raw memory.Node count for find()
// if you have a metric with the same path both in the public tree as well as the tree for your org
// we count it each time, even though we would merge it by path before returning.
// if you have e.g. interval changes which lead to multiple Defs under the same leaf, those are counted
// as one

pre := time.Now()
var matchedNodes []*Node
var err error
Expand All @@ -1301,15 +1313,15 @@ func (m *UnpartitionedMemoryIdx) Find(orgId uint32, pattern string, from, limit
if !ok {
log.Debugf("memory-idx: orgId %d has no metrics indexed.", orgId)
} else {
matchedNodes, err = m.findMaybeCached(tree, orgId, pattern)
matchedNodes, err = m.findMaybeCached(tree, orgId, pattern, limit)
if err != nil {
return nil, err
}
}
if orgId != idx.OrgIdPublic && idx.OrgIdPublic > 0 {
tree, ok = m.tree[idx.OrgIdPublic]
if ok {
publicNodes, err := m.findMaybeCached(tree, idx.OrgIdPublic, pattern)
publicNodes, err := m.findMaybeCached(tree, idx.OrgIdPublic, pattern, limit-int64(len(matchedNodes)))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1366,7 +1378,7 @@ func (m *UnpartitionedMemoryIdx) Find(orgId uint32, pattern string, from, limit
}

// find returns all Nodes matching the pattern for the given tree
func find(tree *Tree, pattern string) ([]*Node, error) {
func find(tree *Tree, pattern string, limit int64) ([]*Node, error) {
var nodes []string
if strings.Index(pattern, ";") == -1 {
nodes = strings.Split(pattern, ".")
Expand Down Expand Up @@ -1438,6 +1450,9 @@ func find(tree *Tree, pattern string) ([]*Node, error) {
return nil, errors.NewInternal("hit an empty path in the index")
}

if int64(len(grandChildren)) == limit-1 {
return nil, errors.NewBadRequest("limit exhausted")
}
grandChildren = append(grandChildren, grandChild)
}
}
Expand Down Expand Up @@ -1547,7 +1562,7 @@ func (m *UnpartitionedMemoryIdx) Delete(orgId uint32, pattern string) ([]idx.Arc
if !ok {
return nil, nil
}
found, err := find(tree, pattern)
found, err := find(tree, pattern, 0)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit b9c3b64

Please sign in to comment.