From e24eb225f15679bbe54f91bfa7da3b00e59b9768 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Mon, 18 Feb 2019 07:46:05 +0100 Subject: [PATCH] Fix data corruption (shard.go) (#119) This PR contains a testcase which demonstrates corruption (intermittently). Sometimes leading to `panic`, and sometimes leading to data corruption of values. I thought that fixing the datarace suggested in #117 would solve it, but it seems I was wrong about that, there's some other underlying bug. I'll push a commit on top if I find it. --- bigcache_test.go | 63 +++++++++++++++++++++++++++++++++++++++ queue/bytes_queue.go | 34 +++++++++++++++++++-- queue/bytes_queue_test.go | 13 ++++++-- shard.go | 26 ++++++++++++++-- 4 files changed, 128 insertions(+), 8 deletions(-) diff --git a/bigcache_test.go b/bigcache_test.go index 187461ac..580777d7 100644 --- a/bigcache_test.go +++ b/bigcache_test.go @@ -1,7 +1,9 @@ package bigcache import ( + "bytes" "fmt" + "math/rand" "runtime" "sync" "testing" @@ -358,6 +360,67 @@ func TestCacheDel(t *testing.T) { assert.Len(t, cachedValue, 0) } +// TestCacheDelRandomly does simultaneous deletes, puts and gets, to check for corruption errors. +func TestCacheDelRandomly(t *testing.T) { + t.Parallel() + c := Config{ + Shards: 1, + LifeWindow: time.Second, + CleanWindow: 0, + MaxEntriesInWindow: 10, + MaxEntrySize: 10, + Verbose: true, + Hasher: newDefaultHasher(), + HardMaxCacheSize: 1, + Logger: DefaultLogger(), + } + //c.Hasher = hashStub(5) + cache, _ := NewBigCache(c) + var wg sync.WaitGroup + var ntest = 800000 + wg.Add(1) + go func() { + for i := 0; i < ntest; i++ { + r := uint8(rand.Int()) + key := fmt.Sprintf("thekey%d", r) + + cache.Delete(key) + } + wg.Done() + }() + wg.Add(1) + go func() { + val := make([]byte, 1024) + for i := 0; i < ntest; i++ { + r := byte(rand.Int()) + key := fmt.Sprintf("thekey%d", r) + + for j := 0; j < len(val); j++ { + val[j] = r + } + cache.Set(key, val) + } + wg.Done() + }() + wg.Add(1) + go func() { + val := make([]byte, 1024) + for i := 0; i < ntest; i++ { + r := byte(rand.Int()) + key := fmt.Sprintf("thekey%d", r) + + for j := 0; j < len(val); j++ { + val[j] = r + } + if got, err := cache.Get(key); err == nil && !bytes.Equal(got, val) { + t.Errorf("got %s ->\n %x\n expected:\n %x\n ", key, got, val) + } + } + wg.Done() + }() + wg.Wait() +} + func TestCacheReset(t *testing.T) { t.Parallel() diff --git a/queue/bytes_queue.go b/queue/bytes_queue.go index 0285c72c..bda73740 100644 --- a/queue/bytes_queue.go +++ b/queue/bytes_queue.go @@ -16,6 +16,12 @@ const ( minimumEmptyBlobSize = 32 + headerEntrySize ) +var ( + errEmptyQueue = &queueError{"Empty queue"} + errInvalidIndex = &queueError{"Index must be greater than zero. Invalid index."} + errIndexOutOfBounds = &queueError{"Index out of range"} +) + // BytesQueue is a non-thread safe queue type of fifo based on bytes array. // For every push operation index of entry is returned. It can be used to read the entry later type BytesQueue struct { @@ -162,6 +168,11 @@ func (q *BytesQueue) Get(index int) ([]byte, error) { return data, err } +// CheckGet checks if an entry can be read from index +func (q *BytesQueue) CheckGet(index int) error { + return q.peekCheckErr(index) +} + // Capacity returns number of allocated bytes for queue func (q *BytesQueue) Capacity() int { return q.capacity @@ -177,18 +188,35 @@ func (e *queueError) Error() string { return e.message } +// peekCheckErr is identical to peek, but does not actually return any data +func (q *BytesQueue) peekCheckErr(index int) error { + + if q.count == 0 { + return errEmptyQueue + } + + if index <= 0 { + return errInvalidIndex + } + + if index+headerEntrySize >= len(q.array) { + return errIndexOutOfBounds + } + return nil +} + func (q *BytesQueue) peek(index int) ([]byte, int, error) { if q.count == 0 { - return nil, 0, &queueError{"Empty queue"} + return nil, 0, errEmptyQueue } if index <= 0 { - return nil, 0, &queueError{"Index must be grater than zero. Invalid index."} + return nil, 0, errInvalidIndex } if index+headerEntrySize >= len(q.array) { - return nil, 0, &queueError{"Index out of range"} + return nil, 0, errIndexOutOfBounds } blockSize := int(binary.LittleEndian.Uint32(q.array[index : index+headerEntrySize])) diff --git a/queue/bytes_queue_test.go b/queue/bytes_queue_test.go index f4342b4e..9abde99a 100644 --- a/queue/bytes_queue_test.go +++ b/queue/bytes_queue_test.go @@ -50,16 +50,19 @@ func TestPeek(t *testing.T) { // when read, err := queue.Peek() - + err2 := queue.peekCheckErr(queue.head) // then + assert.Equal(t, err, err2) assert.EqualError(t, err, "Empty queue") assert.Nil(t, read) // when queue.Push(entry) read, err = queue.Peek() + err2 = queue.peekCheckErr(queue.head) // then + assert.Equal(t, err, err2) assert.NoError(t, err) assert.Equal(t, pop(queue), read) assert.Equal(t, entry, read) @@ -286,10 +289,12 @@ func TestGetEntryFromInvalidIndex(t *testing.T) { // when result, err := queue.Get(0) + err2 := queue.CheckGet(0) // then + assert.Equal(t, err, err2) assert.Nil(t, result) - assert.EqualError(t, err, "Index must be grater than zero. Invalid index.") + assert.EqualError(t, err, "Index must be greater than zero. Invalid index.") } func TestGetEntryFromIndexOutOfRange(t *testing.T) { @@ -301,8 +306,10 @@ func TestGetEntryFromIndexOutOfRange(t *testing.T) { // when result, err := queue.Get(42) + err2 := queue.CheckGet(42) // then + assert.Equal(t, err, err2) assert.Nil(t, result) assert.EqualError(t, err, "Index out of range") } @@ -315,8 +322,10 @@ func TestGetEntryFromEmptyQueue(t *testing.T) { // when result, err := queue.Get(1) + err2 := queue.CheckGet(1) // then + assert.Equal(t, err, err2) assert.Nil(t, result) assert.EqualError(t, err, "Empty queue") } diff --git a/shard.go b/shard.go index 67679ba1..a31094ff 100644 --- a/shard.go +++ b/shard.go @@ -49,9 +49,10 @@ func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) { s.collision() return nil, ErrEntryNotFound } + entry := readEntry(wrappedEntry) s.lock.RUnlock() s.hit() - return readEntry(wrappedEntry), nil + return entry, nil } func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error { @@ -85,6 +86,7 @@ func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error { } func (s *cacheShard) del(key string, hashedKey uint64) error { + // Optimistic pre-check using only readlock s.lock.RLock() itemIndex := s.hashmap[hashedKey] @@ -94,8 +96,7 @@ func (s *cacheShard) del(key string, hashedKey uint64) error { return ErrEntryNotFound } - wrappedEntry, err := s.entries.Get(int(itemIndex)) - if err != nil { + if err := s.entries.CheckGet(int(itemIndex)); err != nil { s.lock.RUnlock() s.delmiss() return err @@ -104,6 +105,23 @@ func (s *cacheShard) del(key string, hashedKey uint64) error { s.lock.Lock() { + // After obtaining the writelock, we need to read the same again, + // since the data delivered earlier may be stale now + itemIndex = s.hashmap[hashedKey] + + if itemIndex == 0 { + s.lock.Unlock() + s.delmiss() + return ErrEntryNotFound + } + + wrappedEntry, err := s.entries.Get(int(itemIndex)) + if err != nil { + s.lock.Unlock() + s.delmiss() + return err + } + delete(s.hashmap, hashedKey) s.onRemove(wrappedEntry, Deleted) resetKeyFromEntry(wrappedEntry) @@ -136,6 +154,8 @@ func (s *cacheShard) cleanUp(currentTimestamp uint64) { } func (s *cacheShard) getOldestEntry() ([]byte, error) { + s.lock.RLock() + defer s.lock.RUnlock() return s.entries.Peek() }