diff --git a/log_cache.go b/log_cache.go index 952e98c22..7328a1203 100644 --- a/log_cache.go +++ b/log_cache.go @@ -51,14 +51,17 @@ func (c *LogCache) StoreLog(log *Log) error { } func (c *LogCache) StoreLogs(logs []*Log) error { - // Insert the logs into the ring buffer + err := c.store.StoreLogs(logs) + // Insert the logs into the ring buffer, but only on success + if err != nil { + return fmt.Errorf("unable to store logs within log store, err: %q", err) + } c.l.Lock() for _, l := range logs { c.cache[l.Index%uint64(len(c.cache))] = l } c.l.Unlock() - - return c.store.StoreLogs(logs) + return nil } func (c *LogCache) FirstIndex() (uint64, error) { diff --git a/log_cache_test.go b/log_cache_test.go index 7569e78ee..95bfa0f9b 100644 --- a/log_cache_test.go +++ b/log_cache_test.go @@ -1,6 +1,9 @@ package raft import ( + "errors" + "strings" + "sync" "testing" ) @@ -86,3 +89,60 @@ func TestLogCache(t *testing.T) { t.Fatalf("err: %v", err) } } + +type errorStore struct { + LogStore + mu sync.Mutex + fail bool + failed int + failMax int +} + +func (e *errorStore) StoreLogs(logs []*Log) error { + e.mu.Lock() + defer e.mu.Unlock() + if e.fail { + e.failed++ + if e.failed <= e.failMax { + return errors.New("some error") + } + e.fail = false + } + return e.LogStore.StoreLogs(logs) +} + +func (e *errorStore) failNext(count int) { + e.mu.Lock() + e.fail = true + e.failMax = count + e.mu.Unlock() +} + +func TestLogCacheWithBackendStoreError(t *testing.T) { + var err error + store := NewInmemStore() + errStore := &errorStore{LogStore: store} + c, _ := NewLogCache(16, errStore) + + for i := 0; i < 4; i++ { + log := &Log{Index: uint64(i) + 1} + store.StoreLog(log) + } + errStore.failNext(1) + log := &Log{Index: 5} + err = c.StoreLog(log) + if !strings.Contains(err.Error(), "some error") { + t.Fatalf("wanted: some error, got err=%v", err) + } + + var out Log + for i := 1; i < 5; i++ { + if err := c.GetLog(uint64(i), &out); err != nil { + t.Fatalf("err: %v", err) + } + } + out = Log{} + if err = c.GetLog(5, &out); err != ErrLogNotFound { + t.Fatalf("Should have returned not found, got err=%v out=%+v", err, out) + } +} diff --git a/raft_test.go b/raft_test.go index 8394b2334..8cadaeba9 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2176,6 +2176,56 @@ func TestRaft_GetConfigurationNoBootstrap(t *testing.T) { } } +func TestRaft_CacheLogWithStoreError(t *testing.T) { + c := MakeCluster(2, t, nil) + defer c.Close() + + // Should be one leader + follower := c.Followers()[0] + leader := c.Leader() + c.EnsureLeader(t, leader.localAddr) + + // There is no lock to protect this assignment I am afraid. + es := &errorStore{LogStore: follower.logs} + cl, _ := NewLogCache(100, es) + follower.logs = cl + + // Commit some logs + for i := 0; i < 5; i++ { + future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) + if err := future.Error(); err != nil { + c.FailNowf("[ERR] err: %v", err) + } + } + + // Make the next fail + es.failNext(1) + leader.Apply([]byte("test6"), 0) + + leader.Apply([]byte("test7"), 0) + future := leader.Apply([]byte("test8"), 0) + + // Wait for the last future to apply + if err := future.Error(); err != nil { + c.FailNowf("[ERR] err: %v", err) + } + + // Shutdown follower + if f := follower.Shutdown(); f.Error() != nil { + c.FailNowf("error shuting down follower: %v", f.Error()) + } + + // Try to restart the follower and make sure it does not fail with a LogNotFound error + _, trans := NewInmemTransport(follower.localAddr) + follower.logs = es.LogStore + conf := follower.config() + n, err := NewRaft(&conf, &MockFSM{}, follower.logs, follower.stable, follower.snapshots, trans) + if err != nil { + c.FailNowf("error restarting follower: %v", err) + } + n.Shutdown() +} + func TestRaft_ReloadConfig(t *testing.T) { conf := inmemConfig(t) c := MakeCluster(1, t, conf)