Skip to content

Commit

Permalink
V1.0.3 bugfix (#410)
Browse files Browse the repository at this point in the history
* remove errant locks release

* make atime asynchronous

* use new context for all upstream requests

* named locks optimizations

* cache index bulk remove optimizations

* bump version to 1.0.3
  • Loading branch information
James Ranson authored Apr 16, 2020
1 parent 4b6f13b commit 41fbf79
Show file tree
Hide file tree
Showing 20 changed files with 152 additions and 89 deletions.
2 changes: 1 addition & 1 deletion cmd/trickster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (

const (
applicationName = "trickster"
applicationVersion = "1.0.2"
applicationVersion = "1.0.3"
)

// Package main is the main package for the Trickster application
Expand Down
2 changes: 1 addition & 1 deletion internal/cache/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c *Cache) Remove(cacheKey string) {
}

// BulkRemove removes a list of objects from the cache. noLock is not used for Badger
func (c *Cache) BulkRemove(cacheKeys []string, noLock bool) {
func (c *Cache) BulkRemove(cacheKeys []string) {
log.Debug("badger cache bulk remove", log.Pairs{})

c.dbh.Update(func(txn *badger.Txn) error {
Expand Down
4 changes: 2 additions & 2 deletions internal/cache/badger/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ func TestBadgerCache_BulkRemove(t *testing.T) {
t.Errorf("expected %s got %s", status.LookupStatusHit, ls)
}

bc.BulkRemove([]string{""}, true)
bc.BulkRemove([]string{cacheKey}, true)
bc.BulkRemove([]string{""})
bc.BulkRemove([]string{cacheKey})

// it should be a cache miss
_, ls, err = bc.Retrieve(cacheKey, false)
Expand Down
12 changes: 7 additions & 5 deletions internal/cache/bbolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (c *Cache) retrieve(cacheKey string, allowExpired bool, atime bool) ([]byte
if allowExpired || o.Expiration.IsZero() || o.Expiration.After(time.Now()) {
log.Debug("bbolt cache retrieve", log.Pairs{"cacheKey": cacheKey})
if atime {
c.Index.UpdateObjectAccessTime(cacheKey)
go c.Index.UpdateObjectAccessTime(cacheKey)
}
cache.ObserveCacheOperation(c.Name, c.Config.CacheType, "get", "hit", float64(len(data)))
locks.Release(lockPrefix + cacheKey)
Expand Down Expand Up @@ -187,7 +187,7 @@ func (c *Cache) Remove(cacheKey string) {
locks.Release(lockPrefix + cacheKey)
}

func (c *Cache) remove(cacheKey string, noLock bool) error {
func (c *Cache) remove(cacheKey string, isBulk bool) error {

err := c.dbh.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(c.Config.BBolt.Bucket))
Expand All @@ -197,16 +197,18 @@ func (c *Cache) remove(cacheKey string, noLock bool) error {
log.Error("bbolt cache key delete failure", log.Pairs{"cacheKey": cacheKey, "reason": err.Error()})
return err
}
c.Index.RemoveObject(cacheKey, noLock)
if !isBulk {
c.Index.RemoveObject(cacheKey)
}
cache.ObserveCacheDel(c.Name, c.Config.CacheType, 0)
log.Debug("bbolt cache key delete", log.Pairs{"key": cacheKey})
return nil
}

// BulkRemove removes a list of objects from the cache
func (c *Cache) BulkRemove(cacheKeys []string, noLock bool) {
func (c *Cache) BulkRemove(cacheKeys []string) {
for _, cacheKey := range cacheKeys {
c.remove(cacheKey, noLock)
c.remove(cacheKey, true)
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/cache/bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestBboltCache_BulkRemove(t *testing.T) {
if ls != status.LookupStatusHit {
t.Errorf("expected %s got %s", status.LookupStatusHit, ls)
}
bc.BulkRemove([]string{cacheKey}, true)
bc.BulkRemove([]string{cacheKey})

// it should be a cache miss
_, ls, err = bc.Retrieve(cacheKey, false)
Expand All @@ -436,7 +436,7 @@ func BenchmarkCache_BulkRemove(b *testing.B) {
keyArray = append(keyArray, cacheKey+strconv.Itoa(n))
}

bc.BulkRemove(keyArray, true)
bc.BulkRemove(keyArray)

// it should be a cache miss
for n := 0; n < b.N; n++ {
Expand Down
4 changes: 2 additions & 2 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Cache interface {
Retrieve(cacheKey string, allowExpired bool) ([]byte, status.LookupStatus, error)
SetTTL(cacheKey string, ttl time.Duration)
Remove(cacheKey string)
BulkRemove(cacheKeys []string, noLock bool)
BulkRemove(cacheKeys []string)
Close() error
Configuration() *config.CachingConfig
}
Expand All @@ -49,7 +49,7 @@ type MemoryCache interface {
Retrieve(cacheKey string, allowExpired bool) ([]byte, status.LookupStatus, error)
SetTTL(cacheKey string, ttl time.Duration)
Remove(cacheKey string)
BulkRemove(cacheKeys []string, noLock bool)
BulkRemove(cacheKeys []string)
Close() error
Configuration() *config.CachingConfig
StoreReference(cacheKey string, data ReferenceObject, ttl time.Duration) error
Expand Down
12 changes: 6 additions & 6 deletions internal/cache/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (c *Cache) retrieve(cacheKey string, allowExpired bool, atime bool) ([]byte
if allowExpired || o.Expiration.IsZero() || o.Expiration.After(time.Now()) {
log.Debug("filesystem cache retrieve", log.Pairs{"key": cacheKey, "dataFile": dataFile})
if atime {
c.Index.UpdateObjectAccessTime(cacheKey)
go c.Index.UpdateObjectAccessTime(cacheKey)
}
cache.ObserveCacheOperation(c.Name, c.Config.CacheType, "get", "hit", float64(len(data)))
locks.Release(lockPrefix + cacheKey)
Expand All @@ -163,18 +163,18 @@ func (c *Cache) Remove(cacheKey string) {
locks.Release(lockPrefix + cacheKey)
}

func (c *Cache) remove(cacheKey string, noLock bool) {
func (c *Cache) remove(cacheKey string, isBulk bool) {

if err := os.Remove(c.getFileName(cacheKey)); err == nil {
c.Index.RemoveObject(cacheKey, noLock)
if err := os.Remove(c.getFileName(cacheKey)); err == nil && !isBulk {
c.Index.RemoveObject(cacheKey)
}
cache.ObserveCacheDel(c.Name, c.Config.CacheType, 0)
}

// BulkRemove removes a list of objects from the cache
func (c *Cache) BulkRemove(cacheKeys []string, noLock bool) {
func (c *Cache) BulkRemove(cacheKeys []string) {
for _, cacheKey := range cacheKeys {
c.remove(cacheKey, noLock)
c.remove(cacheKey, true)
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/cache/filesystem/filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func TestFilesystemCache_BulkRemove(t *testing.T) {
t.Errorf("expected %s got %s", status.LookupStatusHit, ls)
}

fc.BulkRemove([]string{cacheKey}, true)
fc.BulkRemove([]string{cacheKey})

// it should be a cache miss
_, ls, err = fc.Retrieve(cacheKey, false)
Expand All @@ -591,7 +591,7 @@ func BenchmarkCache_BulkRemove(b *testing.B) {
keyArray = append(keyArray, cacheKey+strconv.Itoa(n))
}

fc.BulkRemove(keyArray, true)
fc.BulkRemove(keyArray)

// it should be a cache miss
for n := 0; n < b.N; n++ {
Expand Down
40 changes: 27 additions & 13 deletions internal/cache/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Index struct {
name string `msg:"-"`
cacheType string `msg:"-"`
config config.CacheIndexConfig `msg:"-"`
bulkRemoveFunc func([]string, bool) `msg:"-"`
bulkRemoveFunc func([]string) `msg:"-"`
reapInterval time.Duration `msg:"-"`
flushInterval time.Duration `msg:"-"`
flushFunc func(cacheKey string, data []byte) `msg:"-"`
Expand Down Expand Up @@ -92,7 +92,7 @@ func ObjectFromBytes(data []byte) (*Object, error) {
}

// NewIndex returns a new Index based on the provided inputs
func NewIndex(cacheName, cacheType string, indexData []byte, cfg config.CacheIndexConfig, bulkRemoveFunc func([]string, bool), flushFunc func(cacheKey string, data []byte)) *Index {
func NewIndex(cacheName, cacheType string, indexData []byte, cfg config.CacheIndexConfig, bulkRemoveFunc func([]string), flushFunc func(cacheKey string, data []byte)) *Index {
i := &Index{}

if len(indexData) > 0 {
Expand Down Expand Up @@ -183,25 +183,37 @@ func (idx *Index) UpdateObject(obj *Object) {
}

// RemoveObject removes an Object's Metadata from the Index
func (idx *Index) RemoveObject(key string, noLock bool) {

if !noLock {
indexLock.Lock()
idx.lastWrite = time.Now()
}
func (idx *Index) RemoveObject(key string) {
indexLock.Lock()
idx.lastWrite = time.Now()
if o, ok := idx.Objects[key]; ok {
idx.CacheSize -= o.Size
idx.ObjectCount--

cache.ObserveCacheOperation(idx.name, idx.cacheType, "del", "none", float64(o.Size))

delete(idx.Objects, key)
cache.ObserveCacheSizeChange(idx.name, idx.cacheType, idx.CacheSize, idx.ObjectCount)
}
indexLock.Unlock()
}

// RemoveObjects removes a list of Objects' Metadata from the Index
func (idx *Index) RemoveObjects(keys []string, noLock bool) {
if !noLock {
indexLock.Lock()
}
for _, key := range keys {
if o, ok := idx.Objects[key]; ok {
idx.CacheSize -= o.Size
idx.ObjectCount--
cache.ObserveCacheOperation(idx.name, idx.cacheType, "del", "none", float64(o.Size))
delete(idx.Objects, key)
cache.ObserveCacheSizeChange(idx.name, idx.cacheType, idx.CacheSize, idx.ObjectCount)
}
}
idx.lastWrite = time.Now()
if !noLock {
indexLock.Unlock()
}

}

// GetExpiration returns the cache index's expiration for the object of the given key
Expand Down Expand Up @@ -276,7 +288,8 @@ func (idx *Index) reap() {

if len(removals) > 0 {
cache.ObserveCacheEvent(idx.name, idx.cacheType, "eviction", "ttl")
idx.bulkRemoveFunc(removals, true)
go idx.bulkRemoveFunc(removals)
idx.RemoveObjects(removals, true)
cacheChanged = true
}

Expand Down Expand Up @@ -332,7 +345,8 @@ func (idx *Index) reap() {

if len(removals) > 0 {
cache.ObserveCacheEvent(idx.name, idx.cacheType, "eviction", evictionType)
idx.bulkRemoveFunc(removals, true)
go idx.bulkRemoveFunc(removals)
idx.RemoveObjects(removals, true)
cacheChanged = true
}

Expand Down
7 changes: 2 additions & 5 deletions internal/cache/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ func init() {

var testBulkIndex *Index

func testBulkRemoveFunc(cacheKeys []string, noLock bool) {
for _, cacheKey := range cacheKeys {
testBulkIndex.RemoveObject(cacheKey, noLock)
}
func testBulkRemoveFunc(cacheKeys []string) {
}
func fakeFlusherFunc(string, []byte) {}

Expand Down Expand Up @@ -219,7 +216,7 @@ func TestRemoveObject(t *testing.T) {
t.Errorf("test object missing from index")
}

idx.RemoveObject("test", false)
idx.RemoveObject("test")
if _, ok := idx.Objects["test"]; ok {
t.Errorf("test object should be missing from index")
}
Expand Down
18 changes: 13 additions & 5 deletions internal/cache/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *Cache) retrieve(cacheKey string, allowExpired bool, atime bool) (*index
if allowExpired || o.Expiration.IsZero() || o.Expiration.After(time.Now()) {
log.Debug("memory cache retrieve", log.Pairs{"cacheKey": cacheKey})
if atime {
c.Index.UpdateObjectAccessTime(cacheKey)
go c.Index.UpdateObjectAccessTime(cacheKey)
}
cache.ObserveCacheOperation(c.Name, c.Config.CacheType, "get", "hit", float64(len(o.Value)))
locks.Release(lockPrefix + cacheKey)
Expand All @@ -154,19 +154,27 @@ func (c *Cache) Remove(cacheKey string) {
c.remove(cacheKey, false)
}

func (c *Cache) remove(cacheKey string, noLock bool) {
func (c *Cache) remove(cacheKey string, isBulk bool) {
locks.Acquire(lockPrefix + cacheKey)
c.client.Delete(cacheKey)
c.Index.RemoveObject(cacheKey, noLock)
if !isBulk {
c.Index.RemoveObject(cacheKey)
}
cache.ObserveCacheDel(c.Name, c.Config.CacheType, 0)
locks.Release(lockPrefix + cacheKey)
}

// BulkRemove removes a list of objects from the cache
func (c *Cache) BulkRemove(cacheKeys []string, noLock bool) {
func (c *Cache) BulkRemove(cacheKeys []string) {
wg := &sync.WaitGroup{}
for _, cacheKey := range cacheKeys {
c.remove(cacheKey, noLock)
wg.Add(1)
go func(key string) {
c.remove(key, true)
wg.Done()
}(cacheKey)
}
wg.Wait()
}

// Close is not used for Cache, and is here to fully prototype the Cache Interface
Expand Down
4 changes: 2 additions & 2 deletions internal/cache/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func TestCache_BulkRemove(t *testing.T) {
t.Errorf("expected %s got %s", status.LookupStatusHit, ls)
}

mc.BulkRemove([]string{cacheKey}, true)
mc.BulkRemove([]string{cacheKey})

// it should be a cache miss
_, ls, err = mc.Retrieve(cacheKey, false)
Expand All @@ -369,7 +369,7 @@ func BenchmarkCache_BulkRemove(b *testing.B) {

mc := storeBenchmark(b)

mc.BulkRemove(keyArray, true)
mc.BulkRemove(keyArray)

// it should be a cache miss
for n := 0; n < b.N; n++ {
Expand Down
2 changes: 1 addition & 1 deletion internal/cache/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Cache) SetTTL(cacheKey string, ttl time.Duration) {
}

// BulkRemove removes a list of objects from the cache. noLock is not used for Redis
func (c *Cache) BulkRemove(cacheKeys []string, noLock bool) {
func (c *Cache) BulkRemove(cacheKeys []string) {
log.Debug("redis cache bulk remove", log.Pairs{})
c.client.Del(cacheKeys...)
cache.ObserveCacheDel(c.Name, c.Config.CacheType, float64(len(cacheKeys)))
Expand Down
4 changes: 2 additions & 2 deletions internal/cache/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func TestCache_BulkRemove(t *testing.T) {
t.Errorf("expected %s got %s", status.LookupStatusHit, ls)
}

rc.BulkRemove([]string{cacheKey}, true)
rc.BulkRemove([]string{cacheKey})

// it should be a cache miss
_, ls, err = rc.Retrieve(cacheKey, false)
Expand All @@ -504,7 +504,7 @@ func BenchmarkCache_BulkRemove(b *testing.B) {
keyArray = append(keyArray, cacheKey+strconv.Itoa(n))
}

rc.BulkRemove(keyArray, true)
rc.BulkRemove(keyArray)

// it should be a cache miss
for n := 0; n < b.N; n++ {
Expand Down
10 changes: 5 additions & 5 deletions internal/proxy/engines/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ func (tc *testCache) Retrieve(cacheKey string, allowExpired bool) ([]byte, statu
return nil, status.LookupStatusError, errTest
}

func (tc *testCache) SetTTL(cacheKey string, ttl time.Duration) {}
func (tc *testCache) Remove(cacheKey string) {}
func (tc *testCache) BulkRemove(cacheKeys []string, noLock bool) {}
func (tc *testCache) Close() error { return errTest }
func (tc *testCache) Configuration() *config.CachingConfig { return tc.configuration }
func (tc *testCache) SetTTL(cacheKey string, ttl time.Duration) {}
func (tc *testCache) Remove(cacheKey string) {}
func (tc *testCache) BulkRemove(cacheKeys []string) {}
func (tc *testCache) Close() error { return errTest }
func (tc *testCache) Configuration() *config.CachingConfig { return tc.configuration }
3 changes: 0 additions & 3 deletions internal/proxy/engines/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,9 +756,6 @@ func (c *TestClient) HealthHandler(w http.ResponseWriter, r *http.Request) {
}

func (c *TestClient) QueryRangeHandler(w http.ResponseWriter, r *http.Request) {

//rsc := request.NewResources(c.config, c.path

r.URL = c.BuildUpstreamURL(r)
DeltaProxyCacheRequest(w, r)
}
Expand Down
5 changes: 2 additions & 3 deletions internal/proxy/engines/deltaproxycache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request) {
}
}

client.SetExtent(r, trq, &trq.Extent)
client.SetExtent(pr.upstreamRequest, trq, &trq.Extent)
key := oc.CacheKeyPrefix + "." + pr.DeriveCacheKey(trq.TemplateURL, "")

locks.Acquire(key)
Expand Down Expand Up @@ -229,7 +229,7 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request) {
go func(e *timeseries.Extent, rq *proxyRequest) {
defer wg.Done()
rq.Request = rq.WithContext(tctx.WithResources(r.Context(), request.NewResources(oc, pc, cc, cache, client)))
client.SetExtent(rq.Request, trq, e)
client.SetExtent(rq.upstreamRequest, trq, e)
body, resp, _ := rq.Fetch()
if resp.StatusCode == http.StatusOK && len(body) > 0 {
nts, err := client.UnmarshalTimeseries(body)
Expand Down Expand Up @@ -343,7 +343,6 @@ func DeltaProxyCacheRequest(w http.ResponseWriter, r *http.Request) {
} else {
cdata, err := client.MarshalTimeseries(cts)
if err != nil {
locks.Release(key)
return
}
doc.Body = cdata
Expand Down
Loading

0 comments on commit 41fbf79

Please sign in to comment.