Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out the index cache implementation, add ristretto #849

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added

- [#849](https://github.com/thanos-io/thanos/pull/849) Thanos Store got a new experimental feature: you can switch between different algorithms used for the index cache storage! The new algorithm is based on a pretty modern paper and it performs much better under pressure, has a much better hit ratio, and so on. Please test it out with `--index-cache-algorithm` - it can either be `lru` or `tinylfu`. Please report the results on our issue tracker or in our Slack so that we would know!
- [#1687](https://github.com/thanos-io/thanos/pull/1687) Add a new `--grpc-grace-period` CLI option to components which serve gRPC to set how long to wait until gRPC Server shuts down.
- [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up.
- [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss) for further information.
Expand Down
27 changes: 16 additions & 11 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
indexCacheSize := cmd.Flag("index-cache-size", "Maximum size of items held in the index cache.").
Default("250MB").Bytes()

indexCacheAlgorithm := cmd.Flag("index-cache-algorithm", "Algorithm to use for the index cache.").
Default("lru").Hidden().Enum("lru", "tinylfu")

chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks.").
Default("2GB").Bytes()

Expand Down Expand Up @@ -87,6 +90,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
*httpBindAddr,
time.Duration(*httpGracePeriod),
uint64(*indexCacheSize),
*indexCacheAlgorithm,
uint64(*chunkPoolSize),
uint64(*maxSampleCount),
int(*maxConcurrent),
Expand Down Expand Up @@ -120,6 +124,7 @@ func runStore(
httpBindAddr string,
httpGracePeriod time.Duration,
indexCacheSizeBytes uint64,
indexCacheAlgorithm string,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
Expand Down Expand Up @@ -150,6 +155,17 @@ func runStore(
return errors.Wrap(err, "create bucket client")
}

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2
indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
Algorithm: storecache.CacheAlgorithm(indexCacheAlgorithm),
})
if err != nil {
return errors.Wrap(err, "create index cache")
}

relabelContentYaml, err := selectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "get content of relabel configuration")
Expand All @@ -167,17 +183,6 @@ func runStore(
}
}()

// TODO(bwplotka): Add as a flag?
maxItemSizeBytes := indexCacheSizeBytes / 2

indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
if err != nil {
return errors.Wrap(err, "create index cache")
}

bs, err := store.NewBucketStore(
logger,
reg,
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect
github.com/dgraph-io/ristretto v0.0.0-20191114170855-99d1bbbf28e6
github.com/elastic/go-sysinfo v1.1.1 // indirect
github.com/elastic/go-windows v1.0.1 // indirect
github.com/evanphx/json-patch v4.5.0+incompatible // indirect
Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,13 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda h1:NyywMz59neOoVRFDz+ccfKWxn784fiHMDnZSy6T+JXY=
github.com/dgraph-io/ristretto v0.0.0-20191114170855-99d1bbbf28e6 h1:liDEMz8LbPxfuI8e/noprwccn6gZGv2rN1AgucbxjHs=
github.com/dgraph-io/ristretto v0.0.0-20191114170855-99d1bbbf28e6/go.mod h1:T40EBc7CJke8TkpiYfGGKAeFjSaxuFXhuXRyumBd6RE=
github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-sip13 v0.0.0-20190329191031-25c5027a8c7b/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
Expand Down
115 changes: 94 additions & 21 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package storecache

import (
"math"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
lru "github.com/hashicorp/golang-lru/simplelru"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -53,7 +51,7 @@ type IndexCache struct {
mtx sync.Mutex

logger log.Logger
lru *lru.LRU
storage StorageCache
maxSizeBytes uint64
maxItemSizeBytes uint64

Expand All @@ -69,11 +67,23 @@ type IndexCache struct {
overflow *prometheus.CounterVec
}

// CacheAlgorithm is the caching algorithm that is used by the index cache.
type CacheAlgorithm string

const (
// LRUCache is the LRU-based cache.
LRUCache CacheAlgorithm = "lru"
// TinyLFUCache is the TinyLFU-based cache.
TinyLFUCache CacheAlgorithm = "tinylfu"
)

type Opts struct {
// MaxSizeBytes represents overall maximum number of bytes cache can contain.
MaxSizeBytes uint64
// MaxItemSizeBytes represents maximum size of single item.
MaxItemSizeBytes uint64
// Cache algorithm that will be used.
Algorithm CacheAlgorithm
}

// NewIndexCache creates a new thread-safe LRU cache for index entries and ensures the total cache
Expand Down Expand Up @@ -161,24 +171,59 @@ func NewIndexCache(logger log.Logger, reg prometheus.Registerer, opts Opts) (*In
reg.MustRegister(c.requests, c.hits, c.added, c.evicted, c.current, c.currentSize, c.totalCurrentSize, c.overflow)
}

// Initialize LRU cache with a high size limit since we will manage evictions ourselves
// based on stored size using `RemoveOldest` method.
l, err := lru.NewLRU(math.MaxInt64, c.onEvict)
if err != nil {
return nil, err
if opts.Algorithm == "" {
opts.Algorithm = "lru"
}

switch opts.Algorithm {
case LRUCache:
// Initialize the LRU cache with a high size limit since we will manage evictions ourselves
// based on stored size using `RemoveOldest` method.
storage, err := NewSimpleLRU(c.lruOnEvict)
if err != nil {
return nil, err
}
c.storage = storage
default:
case TinyLFUCache:
storage, err := NewTinyLFU(func(key uint64, conflict uint64, val interface{}, cost int64) {
entrySize := sliceHeaderSize + cost

// Extract the key's type encoded as the last byte.
v := val.([]byte)
k := v[len(v)-1]
var keyType string
switch k {
case keyTypePostings:
keyType = cacheTypeSeries
case keyTypeSeries:
keyType = cacheTypePostings
default:
panic("unhandled key type")
}

c.curSize -= uint64(entrySize)
c.evicted.WithLabelValues(keyType).Inc()
c.current.WithLabelValues(keyType).Dec()
c.currentSize.WithLabelValues(keyType).Sub(float64(entrySize))
// uint64 keys are used and uint64 hashes for checking conflicts.
c.totalCurrentSize.WithLabelValues(keyType).Sub(float64(entrySize + 8 + 8))
}, int64(c.maxSizeBytes))
if err != nil {
return nil, err
}
c.storage = storage
}
c.lru = l

level.Info(logger).Log(
"msg", "created index cache",
"maxItemSizeBytes", c.maxItemSizeBytes,
"maxSizeBytes", c.maxSizeBytes,
"maxItems", "math.MaxInt64",
)
return c, nil
}

func (c *IndexCache) onEvict(key, val interface{}) {
func (c *IndexCache) lruOnEvict(key, val interface{}) {
k := key.(cacheKey).keyType()
entrySize := sliceHeaderSize + uint64(len(val.([]byte)))

Expand All @@ -196,7 +241,7 @@ func (c *IndexCache) get(typ string, key cacheKey) ([]byte, bool) {
c.mtx.Lock()
defer c.mtx.Unlock()

v, ok := c.lru.Get(key)
v, ok := c.storage.Get(key)
if !ok {
return nil, false
}
Expand All @@ -210,7 +255,7 @@ func (c *IndexCache) set(typ string, key cacheKey, val []byte) {
c.mtx.Lock()
defer c.mtx.Unlock()

if _, ok := c.lru.Get(key); ok {
if _, ok := c.storage.Get(key); ok {
return
}

Expand All @@ -219,20 +264,41 @@ func (c *IndexCache) set(typ string, key cacheKey, val []byte) {
return
}

var keySize uint64
// The caller may be passing in a sub-slice of a huge array. Copy the data
// to ensure we don't waste huge amounts of space for something small.
v := make([]byte, len(val))
copy(v, val)
c.lru.Add(key, v)
var v []byte
if !c.storage.KeyData() {
v = make([]byte, len(val)+1)
copy(v, val)
// Encode the key's type inside of the value.
switch typ {
case cacheTypeSeries:
v = append(v, keyTypeSeries)
case cacheTypePostings:
v = append(v, keyTypePostings)
default:
panic("unhandled index cache item type")
}
size++
// 2 uint64 hashes.
keySize = 8 + 8
} else {
v = make([]byte, len(val))
copy(v, val)
keySize = key.size()
}
c.storage.Add(key, v)
c.curSize += size

c.added.WithLabelValues(typ).Inc()
c.currentSize.WithLabelValues(typ).Add(float64(size))
c.totalCurrentSize.WithLabelValues(typ).Add(float64(size + key.size()))
c.totalCurrentSize.WithLabelValues(typ).Add(float64(size + keySize))
c.current.WithLabelValues(typ).Inc()
c.curSize += size

}

// ensureFits tries to make sure that the passed slice will fit into the LRU cache.
// ensureFits tries to make sure that the passed slice will fit into the cache.
// Returns true if it will fit.
func (c *IndexCache) ensureFits(size uint64, typ string) bool {
if size > c.maxItemSizeBytes {
Expand All @@ -247,8 +313,13 @@ func (c *IndexCache) ensureFits(size uint64, typ string) bool {
return false
}

// TinyLFU already manages the capacity restrictions for us.
if _, ok := c.storage.(*TinyLFU); ok {
return true
}

for c.curSize+size > c.maxSizeBytes {
if _, _, ok := c.lru.RemoveOldest(); !ok {
if _, _, ok := c.storage.RemoveOldest(); !ok {
level.Error(c.logger).Log(
"msg", "LRU has nothing more to evict, but we still cannot allocate the item. Resetting cache.",
"maxItemSizeBytes", c.maxItemSizeBytes,
Expand All @@ -264,7 +335,7 @@ func (c *IndexCache) ensureFits(size uint64, typ string) bool {
}

func (c *IndexCache) reset() {
c.lru.Purge()
c.storage.Purge()
c.current.Reset()
c.currentSize.Reset()
c.totalCurrentSize.Reset()
Expand All @@ -277,6 +348,7 @@ func (c *IndexCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {
c.set(cacheTypePostings, cacheKey{b, cacheKeyPostings(l)}, v)
}

// Postings gets the postings from the index cache as identified by the ulid and labels.
func (c *IndexCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) {
return c.get(cacheTypePostings, cacheKey{b, cacheKeyPostings(l)})
}
Expand All @@ -287,6 +359,7 @@ func (c *IndexCache) SetSeries(b ulid.ULID, id uint64, v []byte) {
c.set(cacheTypeSeries, cacheKey{b, cacheKeySeries(id)}, v)
}

// Series gets the series data from the index cache as identified by the ulid and labels.
func (c *IndexCache) Series(b ulid.ULID, id uint64) ([]byte, bool) {
return c.get(cacheTypeSeries, cacheKey{b, cacheKeySeries(id)})
}
Loading