diff --git a/CHANGELOG.md b/CHANGELOG.md index 7301a46339..d76ba68ae6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added +- [#849](https://github.com/thanos-io/thanos/pull/849) Thanos Store got a new experimental feature: you can switch between different algorithms used for the index cache storage! The new algorithm is based on a pretty modern paper and it performs much better under pressure, has a much better hit ratio, and so on. Please test it out with `--index-cache-algorithm` - it can either be `lru` or `tinylfu`. Please report the results on our issue tracker or in our Slack so that we would know! - [#1687](https://github.com/thanos-io/thanos/pull/1687) Add a new `--grpc-grace-period` CLI option to components which serve gRPC to set how long to wait until gRPC Server shuts down. - [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up. - [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss) for further information. diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 13833a3a12..eba84160a5 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -39,6 +39,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { indexCacheSize := cmd.Flag("index-cache-size", "Maximum size of items held in the index cache."). Default("250MB").Bytes() + indexCacheAlgorithm := cmd.Flag("index-cache-algorithm", "Algorithm to use for the index cache."). + Default("lru").Hidden().Enum("lru", "tinylfu") + chunkPoolSize := cmd.Flag("chunk-pool-size", "Maximum size of concurrently allocatable bytes for chunks."). Default("2GB").Bytes() @@ -87,6 +90,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { *httpBindAddr, time.Duration(*httpGracePeriod), uint64(*indexCacheSize), + *indexCacheAlgorithm, uint64(*chunkPoolSize), uint64(*maxSampleCount), int(*maxConcurrent), @@ -120,6 +124,7 @@ func runStore( httpBindAddr string, httpGracePeriod time.Duration, indexCacheSizeBytes uint64, + indexCacheAlgorithm string, chunkPoolSizeBytes uint64, maxSampleCount uint64, maxConcurrent int, @@ -150,6 +155,17 @@ func runStore( return errors.Wrap(err, "create bucket client") } + // TODO(bwplotka): Add as a flag? + maxItemSizeBytes := indexCacheSizeBytes / 2 + indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{ + MaxSizeBytes: indexCacheSizeBytes, + MaxItemSizeBytes: maxItemSizeBytes, + Algorithm: storecache.CacheAlgorithm(indexCacheAlgorithm), + }) + if err != nil { + return errors.Wrap(err, "create index cache") + } + relabelContentYaml, err := selectorRelabelConf.Content() if err != nil { return errors.Wrap(err, "get content of relabel configuration") @@ -167,17 +183,6 @@ func runStore( } }() - // TODO(bwplotka): Add as a flag? - maxItemSizeBytes := indexCacheSizeBytes / 2 - - indexCache, err := storecache.NewIndexCache(logger, reg, storecache.Opts{ - MaxSizeBytes: indexCacheSizeBytes, - MaxItemSizeBytes: maxItemSizeBytes, - }) - if err != nil { - return errors.Wrap(err, "create index cache") - } - bs, err := store.NewBucketStore( logger, reg, diff --git a/go.mod b/go.mod index de1d4ed9b9..e0f392d0af 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect + github.com/dgraph-io/ristretto v0.0.0-20191114170855-99d1bbbf28e6 github.com/elastic/go-sysinfo v1.1.1 // indirect github.com/elastic/go-windows v1.0.1 // indirect github.com/evanphx/json-patch v4.5.0+incompatible // indirect diff --git a/go.sum b/go.sum index f049dfe801..170eee0840 100644 --- a/go.sum +++ b/go.sum @@ -124,10 +124,13 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda h1:NyywMz59neOoVRFDz+ccfKWxn784fiHMDnZSy6T+JXY= +github.com/dgraph-io/ristretto v0.0.0-20191114170855-99d1bbbf28e6 h1:liDEMz8LbPxfuI8e/noprwccn6gZGv2rN1AgucbxjHs= +github.com/dgraph-io/ristretto v0.0.0-20191114170855-99d1bbbf28e6/go.mod h1:T40EBc7CJke8TkpiYfGGKAeFjSaxuFXhuXRyumBd6RE= github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-sip13 v0.0.0-20190329191031-25c5027a8c7b/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index 364239277b..47db6fcaf5 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -1,12 +1,10 @@ package storecache import ( - "math" "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - lru "github.com/hashicorp/golang-lru/simplelru" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -53,7 +51,7 @@ type IndexCache struct { mtx sync.Mutex logger log.Logger - lru *lru.LRU + storage StorageCache maxSizeBytes uint64 maxItemSizeBytes uint64 @@ -69,11 +67,23 @@ type IndexCache struct { overflow *prometheus.CounterVec } +// CacheAlgorithm is the caching algorithm that is used by the index cache. +type CacheAlgorithm string + +const ( + // LRUCache is the LRU-based cache. + LRUCache CacheAlgorithm = "lru" + // TinyLFUCache is the TinyLFU-based cache. + TinyLFUCache CacheAlgorithm = "tinylfu" +) + type Opts struct { // MaxSizeBytes represents overall maximum number of bytes cache can contain. MaxSizeBytes uint64 // MaxItemSizeBytes represents maximum size of single item. MaxItemSizeBytes uint64 + // Cache algorithm that will be used. + Algorithm CacheAlgorithm } // NewIndexCache creates a new thread-safe LRU cache for index entries and ensures the total cache @@ -161,24 +171,59 @@ func NewIndexCache(logger log.Logger, reg prometheus.Registerer, opts Opts) (*In reg.MustRegister(c.requests, c.hits, c.added, c.evicted, c.current, c.currentSize, c.totalCurrentSize, c.overflow) } - // Initialize LRU cache with a high size limit since we will manage evictions ourselves - // based on stored size using `RemoveOldest` method. - l, err := lru.NewLRU(math.MaxInt64, c.onEvict) - if err != nil { - return nil, err + if opts.Algorithm == "" { + opts.Algorithm = "lru" + } + + switch opts.Algorithm { + case LRUCache: + // Initialize the LRU cache with a high size limit since we will manage evictions ourselves + // based on stored size using `RemoveOldest` method. + storage, err := NewSimpleLRU(c.lruOnEvict) + if err != nil { + return nil, err + } + c.storage = storage + default: + case TinyLFUCache: + storage, err := NewTinyLFU(func(key uint64, conflict uint64, val interface{}, cost int64) { + entrySize := sliceHeaderSize + cost + + // Extract the key's type encoded as the last byte. + v := val.([]byte) + k := v[len(v)-1] + var keyType string + switch k { + case keyTypePostings: + keyType = cacheTypeSeries + case keyTypeSeries: + keyType = cacheTypePostings + default: + panic("unhandled key type") + } + + c.curSize -= uint64(entrySize) + c.evicted.WithLabelValues(keyType).Inc() + c.current.WithLabelValues(keyType).Dec() + c.currentSize.WithLabelValues(keyType).Sub(float64(entrySize)) + // uint64 keys are used and uint64 hashes for checking conflicts. + c.totalCurrentSize.WithLabelValues(keyType).Sub(float64(entrySize + 8 + 8)) + }, int64(c.maxSizeBytes)) + if err != nil { + return nil, err + } + c.storage = storage } - c.lru = l level.Info(logger).Log( "msg", "created index cache", "maxItemSizeBytes", c.maxItemSizeBytes, "maxSizeBytes", c.maxSizeBytes, - "maxItems", "math.MaxInt64", ) return c, nil } -func (c *IndexCache) onEvict(key, val interface{}) { +func (c *IndexCache) lruOnEvict(key, val interface{}) { k := key.(cacheKey).keyType() entrySize := sliceHeaderSize + uint64(len(val.([]byte))) @@ -196,7 +241,7 @@ func (c *IndexCache) get(typ string, key cacheKey) ([]byte, bool) { c.mtx.Lock() defer c.mtx.Unlock() - v, ok := c.lru.Get(key) + v, ok := c.storage.Get(key) if !ok { return nil, false } @@ -210,7 +255,7 @@ func (c *IndexCache) set(typ string, key cacheKey, val []byte) { c.mtx.Lock() defer c.mtx.Unlock() - if _, ok := c.lru.Get(key); ok { + if _, ok := c.storage.Get(key); ok { return } @@ -219,20 +264,41 @@ func (c *IndexCache) set(typ string, key cacheKey, val []byte) { return } + var keySize uint64 // The caller may be passing in a sub-slice of a huge array. Copy the data // to ensure we don't waste huge amounts of space for something small. - v := make([]byte, len(val)) - copy(v, val) - c.lru.Add(key, v) + var v []byte + if !c.storage.KeyData() { + v = make([]byte, len(val)+1) + copy(v, val) + // Encode the key's type inside of the value. + switch typ { + case cacheTypeSeries: + v = append(v, keyTypeSeries) + case cacheTypePostings: + v = append(v, keyTypePostings) + default: + panic("unhandled index cache item type") + } + size++ + // 2 uint64 hashes. + keySize = 8 + 8 + } else { + v = make([]byte, len(val)) + copy(v, val) + keySize = key.size() + } + c.storage.Add(key, v) + c.curSize += size c.added.WithLabelValues(typ).Inc() c.currentSize.WithLabelValues(typ).Add(float64(size)) - c.totalCurrentSize.WithLabelValues(typ).Add(float64(size + key.size())) + c.totalCurrentSize.WithLabelValues(typ).Add(float64(size + keySize)) c.current.WithLabelValues(typ).Inc() - c.curSize += size + } -// ensureFits tries to make sure that the passed slice will fit into the LRU cache. +// ensureFits tries to make sure that the passed slice will fit into the cache. // Returns true if it will fit. func (c *IndexCache) ensureFits(size uint64, typ string) bool { if size > c.maxItemSizeBytes { @@ -247,8 +313,13 @@ func (c *IndexCache) ensureFits(size uint64, typ string) bool { return false } + // TinyLFU already manages the capacity restrictions for us. + if _, ok := c.storage.(*TinyLFU); ok { + return true + } + for c.curSize+size > c.maxSizeBytes { - if _, _, ok := c.lru.RemoveOldest(); !ok { + if _, _, ok := c.storage.RemoveOldest(); !ok { level.Error(c.logger).Log( "msg", "LRU has nothing more to evict, but we still cannot allocate the item. Resetting cache.", "maxItemSizeBytes", c.maxItemSizeBytes, @@ -264,7 +335,7 @@ func (c *IndexCache) ensureFits(size uint64, typ string) bool { } func (c *IndexCache) reset() { - c.lru.Purge() + c.storage.Purge() c.current.Reset() c.currentSize.Reset() c.totalCurrentSize.Reset() @@ -277,6 +348,7 @@ func (c *IndexCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) { c.set(cacheTypePostings, cacheKey{b, cacheKeyPostings(l)}, v) } +// Postings gets the postings from the index cache as identified by the ulid and labels. func (c *IndexCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) { return c.get(cacheTypePostings, cacheKey{b, cacheKeyPostings(l)}) } @@ -287,6 +359,7 @@ func (c *IndexCache) SetSeries(b ulid.ULID, id uint64, v []byte) { c.set(cacheTypeSeries, cacheKey{b, cacheKeySeries(id)}, v) } +// Series gets the series data from the index cache as identified by the ulid and labels. func (c *IndexCache) Series(b ulid.ULID, id uint64) ([]byte, bool) { return c.get(cacheTypeSeries, cacheKey{b, cacheKeySeries(id)}) } diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index eb87d028e5..a97361b400 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -25,17 +25,18 @@ func TestIndexCache_AvoidsDeadlock(t *testing.T) { cache, err := NewIndexCache(log.NewNopLogger(), metrics, Opts{ MaxItemSizeBytes: sliceHeaderSize + 5, MaxSizeBytes: sliceHeaderSize + 5, + Algorithm: "lru", }) testutil.Ok(t, err) l, err := simplelru.NewLRU(math.MaxInt64, func(key, val interface{}) { // Hack LRU to simulate broken accounting: evictions do not reduce current size. size := cache.curSize - cache.onEvict(key, val) + cache.lruOnEvict(key, val) cache.curSize = size }) testutil.Ok(t, err) - cache.lru = l + cache.storage = StorageCache(&SimpleLRU{l: l}) cache.SetPostings(ulid.MustNew(0, nil), labels.Label{Name: "test2", Value: "1"}, []byte{42, 33, 14, 67, 11}) @@ -77,6 +78,7 @@ func TestIndexCache_UpdateItem(t *testing.T) { cache, err := NewIndexCache(log.NewSyncLogger(errorLogger), metrics, Opts{ MaxItemSizeBytes: maxSize, MaxSizeBytes: maxSize, + Algorithm: "lru", }) testutil.Ok(t, err) @@ -152,12 +154,13 @@ func TestIndexCache_MaxNumberOfItemsHit(t *testing.T) { cache, err := NewIndexCache(log.NewNopLogger(), metrics, Opts{ MaxItemSizeBytes: 2*sliceHeaderSize + 10, MaxSizeBytes: 2*sliceHeaderSize + 10, + Algorithm: "lru", }) testutil.Ok(t, err) - l, err := simplelru.NewLRU(2, cache.onEvict) + l, err := simplelru.NewLRU(2, cache.lruOnEvict) testutil.Ok(t, err) - cache.lru = l + cache.storage = StorageCache(&SimpleLRU{l: l}) id := ulid.MustNew(0, nil) @@ -185,6 +188,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { cache, err := NewIndexCache(log.NewNopLogger(), metrics, Opts{ MaxItemSizeBytes: 2*sliceHeaderSize + 5, MaxSizeBytes: 2*sliceHeaderSize + 5, + Algorithm: "lru", }) testutil.Ok(t, err) @@ -299,7 +303,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - _, _, ok = cache.lru.RemoveOldest() + _, _, ok = cache.storage.RemoveOldest() testutil.Assert(t, ok, "something to remove") testutil.Equals(t, uint64(0), cache.curSize) @@ -314,7 +318,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) - _, _, ok = cache.lru.RemoveOldest() + _, _, ok = cache.storage.RemoveOldest() testutil.Assert(t, !ok, "nothing to remove") lbls3 := labels.Label{Name: "test", Value: "124"} @@ -365,3 +369,28 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(5), promtest.ToFloat64(cache.hits.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.hits.WithLabelValues(cacheTypeSeries))) } + +// TestIndexCache_TinyLFU_Smoke runs the smoke tests for TinyLFU. +func TestIndexCache_TinyLFU_Smoke(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + metrics := prometheus.NewRegistry() + cache, err := NewIndexCache(log.NewNopLogger(), metrics, Opts{ + MaxItemSizeBytes: 2*sliceHeaderSize + 5, + MaxSizeBytes: 2*sliceHeaderSize + 5, + Algorithm: "lru", + }) + testutil.Ok(t, err) + + id := ulid.MustNew(0, nil) + lbls := labels.Label{Name: "test", Value: "123"} + + _, ok := cache.Postings(id, lbls) + testutil.Assert(t, !ok, "no such key") + + cache.SetPostings(id, lbls, []byte{42}) + + val, ok := cache.Postings(id, lbls) + testutil.Assert(t, ok, "postings not found") + testutil.Assert(t, val[0] == 42 && len(val) == 1, "byte slice contains other content") +} diff --git a/pkg/store/cache/ristretto.go b/pkg/store/cache/ristretto.go new file mode 100644 index 0000000000..79ce663ec1 --- /dev/null +++ b/pkg/store/cache/ristretto.go @@ -0,0 +1,78 @@ +package storecache + +import ( + "github.com/dgraph-io/ristretto" + "github.com/dgraph-io/ristretto/z" +) + +// Values used for encoding the key's type. +const ( + keyTypePostings byte = iota + keyTypeSeries +) + +// TinyLFU is a wrapper around Ristretto (TinyLFU). +type TinyLFU struct { + l *ristretto.Cache +} + +// Add adds the key with the specified value. +func (t *TinyLFU) Add(key, val interface{}) { + v := val.([]byte) + t.l.Set(key, val, int64(len(v))) +} + +// Get gets the key's value. +func (t *TinyLFU) Get(key interface{}) (interface{}, bool) { + return t.l.Get(key) +} + +// RemoveOldest removes the oldest key. +func (t *TinyLFU) RemoveOldest() (interface{}, interface{}, bool) { + // NOOP since TinyLFU is size restricted itself. + return nil, nil, false +} + +// Purge purges the LRU. +func (t *TinyLFU) Purge() { + t.l.Clear() +} + +// KeyData returns if the cache retains key data. +func (t *TinyLFU) KeyData() bool { + return false +} + +// NewTinyLFU returns a new TinyLFU based cache storage which +// calls the given onEvict on eviction. +func NewTinyLFU(onEvict func(key uint64, conflict uint64, val interface{}, cost int64), maxSize int64) (StorageCache, error) { + ctrs := maxSize / 1000 // Seems like a good value, ad-hoc calculation of cache size divided by avg. cache item's size. + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: ctrs, + MaxCost: maxSize, + BufferItems: 64, // Value that should give good enough performance. + OnEvict: onEvict, + KeyToHash: func(key interface{}) (uint64, uint64) { + k := key.(cacheKey) + b := [16]byte(k.block) + + var d uint64 + + keyType := k.keyType() + switch keyType { + case cacheTypePostings: + datum := k.key.(cacheKeyPostings) + d, _ = z.KeyToHash(datum.Name + datum.Value) + case cacheTypeSeries: + datum := k.key.(cacheKeySeries) + d, _ = z.KeyToHash(uint64(datum)) + } + hashedBlock, _ := z.KeyToHash(b[:]) + return z.KeyToHash(hashedBlock + d) + }, + }) + if err != nil { + return nil, err + } + return StorageCache(&TinyLFU{l: cache}), nil +} diff --git a/pkg/store/cache/simplelru.go b/pkg/store/cache/simplelru.go new file mode 100644 index 0000000000..a425179899 --- /dev/null +++ b/pkg/store/cache/simplelru.go @@ -0,0 +1,49 @@ +package storecache + +import ( + "math" + + lru "github.com/hashicorp/golang-lru/simplelru" +) + +// SimpleLRU is a wrapper around a simple LRU data structure. +type SimpleLRU struct { + l *lru.LRU +} + +// Add adds the key with the specified value. +func (s *SimpleLRU) Add(key, val interface{}) { + s.l.Add(key, val) +} + +// Get gets the key's value. +func (s *SimpleLRU) Get(key interface{}) (interface{}, bool) { + return s.l.Get(key) +} + +// RemoveOldest removes the oldest key. +func (s *SimpleLRU) RemoveOldest() (interface{}, interface{}, bool) { + return s.l.RemoveOldest() +} + +// Purge purges the LRU. +func (s *SimpleLRU) Purge() { + s.l.Purge() +} + +// KeyData returns if the cache retains key data. +func (s *SimpleLRU) KeyData() bool { + return true +} + +// NewSimpleLRU returns a new simple LRU based cache storage which +// calls the given onEvict on eviction. +func NewSimpleLRU(onEvict func(key, val interface{})) (StorageCache, error) { + // Initialize LRU cache with a high size limit since we will manage evictions ourselves + // based on stored size using `RemoveOldest` method. + l, err := lru.NewLRU(math.MaxInt64, onEvict) + if err != nil { + return nil, err + } + return StorageCache(&SimpleLRU{l: l}), nil +} diff --git a/pkg/store/cache/storage.go b/pkg/store/cache/storage.go new file mode 100644 index 0000000000..adfbc4a5bb --- /dev/null +++ b/pkg/store/cache/storage.go @@ -0,0 +1,11 @@ +package storecache + +// StorageCache is a wrapper around typical Get()/Set() operations +// of a cache. Some might be a no-op on certain implementations. +type StorageCache interface { + Get(key interface{}) (val interface{}, ok bool) + Add(key interface{}, val interface{}) + RemoveOldest() (key interface{}, val interface{}, ok bool) + Purge() + KeyData() bool // True if it retains exact information about keys. +}