Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Updated] LogCache should not cache on StoreLogs error #460

Merged
merged 6 commits into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions log_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,17 @@ func (c *LogCache) StoreLog(log *Log) error {
}

func (c *LogCache) StoreLogs(logs []*Log) error {
// Insert the logs into the ring buffer
err := c.store.StoreLogs(logs)
// Insert the logs into the ring buffer, but only on success
if err != nil {
return fmt.Errorf("unable to store logs within log store, err: %q", err)
}
c.l.Lock()
for _, l := range logs {
c.cache[l.Index%uint64(len(c.cache))] = l
}
c.l.Unlock()

return c.store.StoreLogs(logs)
return nil
}

func (c *LogCache) FirstIndex() (uint64, error) {
Expand Down
60 changes: 60 additions & 0 deletions log_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package raft

import (
"errors"
"strings"
"sync"
"testing"
)

Expand Down Expand Up @@ -86,3 +89,60 @@ func TestLogCache(t *testing.T) {
t.Fatalf("err: %v", err)
}
}

type errorStore struct {
LogStore
mu sync.Mutex
fail bool
failed int
failMax int
}

func (e *errorStore) StoreLogs(logs []*Log) error {
e.mu.Lock()
defer e.mu.Unlock()
if e.fail {
e.failed++
if e.failed <= e.failMax {
return errors.New("some error")
}
e.fail = false
}
return e.LogStore.StoreLogs(logs)
}

func (e *errorStore) failNext(count int) {
e.mu.Lock()
e.fail = true
e.failMax = count
e.mu.Unlock()
}

func TestLogCacheWithBackendStoreError(t *testing.T) {
var err error
store := NewInmemStore()
errStore := &errorStore{LogStore: store}
c, _ := NewLogCache(16, errStore)

for i := 0; i < 4; i++ {
log := &Log{Index: uint64(i) + 1}
store.StoreLog(log)
}
errStore.failNext(1)
log := &Log{Index: 5}
err = c.StoreLog(log)
if !strings.Contains(err.Error(), "some error") {
t.Fatalf("wanted: some error, got err=%v", err)
}

var out Log
for i := 1; i < 5; i++ {
if err := c.GetLog(uint64(i), &out); err != nil {
t.Fatalf("err: %v", err)
}
}
out = Log{}
if err = c.GetLog(5, &out); err != ErrLogNotFound {
t.Fatalf("Should have returned not found, got err=%v out=%+v", err, out)
schristoff marked this conversation as resolved.
Show resolved Hide resolved
}
}
50 changes: 50 additions & 0 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2176,6 +2176,56 @@ func TestRaft_GetConfigurationNoBootstrap(t *testing.T) {
}
}

func TestRaft_CacheLogWithStoreError(t *testing.T) {
c := MakeCluster(2, t, nil)
defer c.Close()

// Should be one leader
follower := c.Followers()[0]
leader := c.Leader()
c.EnsureLeader(t, leader.localAddr)

// There is no lock to protect this assignment I am afraid.
es := &errorStore{LogStore: follower.logs}
cl, _ := NewLogCache(100, es)
follower.logs = cl

// Commit some logs
for i := 0; i < 5; i++ {
future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0)
if err := future.Error(); err != nil {
c.FailNowf("[ERR] err: %v", err)
}
}

// Make the next fail
es.failNext(1)
leader.Apply([]byte("test6"), 0)

leader.Apply([]byte("test7"), 0)
future := leader.Apply([]byte("test8"), 0)

// Wait for the last future to apply
if err := future.Error(); err != nil {
c.FailNowf("[ERR] err: %v", err)
}

// Shutdown follower
if f := follower.Shutdown(); f.Error() != nil {
c.FailNowf("error shuting down follower: %v", f.Error())
}

// Try to restart the follower and make sure it does not fail with a LogNotFound error
_, trans := NewInmemTransport(follower.localAddr)
follower.logs = es.LogStore
conf := follower.config()
n, err := NewRaft(&conf, &MockFSM{}, follower.logs, follower.stable, follower.snapshots, trans)
if err != nil {
c.FailNowf("error restarting follower: %v", err)
}
n.Shutdown()
}
schristoff marked this conversation as resolved.
Show resolved Hide resolved

func TestRaft_ReloadConfig(t *testing.T) {
conf := inmemConfig(t)
c := MakeCluster(1, t, conf)
Expand Down