Skip to content

Commit

Permalink
Merge pull request #31351 from tschottdorf/backport2.0-29579
Browse files Browse the repository at this point in the history
backport-2.0: storage: return one entry less in Entries
  • Loading branch information
tbg authored Oct 15, 2018
2 parents f28c2de + 2536910 commit a403a48
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 39 deletions.
27 changes: 16 additions & 11 deletions pkg/storage/entry_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,16 @@ func (rec *raftEntryCache) getTerm(rangeID roachpb.RangeID, index uint64) (uint6
// getEntries returns entries between [lo, hi) for specified range.
// If any entries are returned for the specified indexes, they will
// start with index lo and proceed sequentially without gaps until
// 1) all entries exclusive of hi are fetched, 2) > maxBytes of
// entries data is fetched, or 3) a cache miss occurs.
// 1) all entries exclusive of hi are fetched, 2) fetching another entry
// would add up to more than maxBytes of data, or 3) a cache miss occurs.
// The returned size reflects the size of the returned entries.
func (rec *raftEntryCache) getEntries(
ents []raftpb.Entry, rangeID roachpb.RangeID, lo, hi, maxBytes uint64,
) ([]raftpb.Entry, uint64, uint64) {
) (_ []raftpb.Entry, size uint64, nextIndex uint64, exceededMaxBytes bool) {
rec.Lock()
defer rec.Unlock()
var bytes uint64
nextIndex := lo
nextIndex = lo

rec.fromKey = entryCacheKey{RangeID: rangeID, Index: lo}
rec.toKey = entryCacheKey{RangeID: rangeID, Index: hi}
Expand All @@ -155,16 +156,20 @@ func (rec *raftEntryCache) getEntries(
return true
}
ent := v.(*raftpb.Entry)
ents = append(ents, *ent)
bytes += uint64(ent.Size())
nextIndex++
if maxBytes > 0 && bytes > maxBytes {
return true
size := uint64(ent.Size())
if bytes+size > maxBytes {
exceededMaxBytes = true
if len(ents) > 0 {
return true
}
}
return false
nextIndex++
bytes += size
ents = append(ents, *ent)
return exceededMaxBytes
}, &rec.fromKey, &rec.toKey)

return ents, bytes, nextIndex
return ents, bytes, nextIndex, exceededMaxBytes
}

// delEntries deletes entries between [lo, hi) for specified range.
Expand Down
15 changes: 9 additions & 6 deletions pkg/storage/entry_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"math"
"reflect"
"testing"

Expand All @@ -24,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

const noLimit = math.MaxUint64

func newEntry(index, size uint64) raftpb.Entry {
return raftpb.Entry{
Index: index,
Expand All @@ -48,12 +51,12 @@ func verifyGet(
expEnts []raftpb.Entry,
expNextIndex uint64,
) {
ents, _, nextIndex := rec.getEntries(nil, rangeID, lo, hi, 0)
ents, _, nextIndex, _ := rec.getEntries(nil, rangeID, lo, hi, noLimit)
if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) {
t.Fatalf("expected entries %+v; got %+v", expEnts, ents)
}
if nextIndex != expNextIndex {
t.Fatalf("expected next index %d; got %d", nextIndex, expNextIndex)
t.Fatalf("expected next index %d; got %d", expNextIndex, nextIndex)
}
for _, e := range ents {
term, ok := rec.getTerm(rangeID, e.Index)
Expand Down Expand Up @@ -115,10 +118,10 @@ func TestEntryCacheClearTo(t *testing.T) {
rec.addEntries(rangeID, []raftpb.Entry{newEntry(2, 1)})
rec.addEntries(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)})
rec.clearTo(rangeID, 21)
if ents, _, _ := rec.getEntries(nil, rangeID, 2, 21, 0); len(ents) != 0 {
if ents, _, _, _ := rec.getEntries(nil, rangeID, 2, 21, noLimit); len(ents) != 0 {
t.Errorf("expected no entries after clearTo")
}
if ents, _, _ := rec.getEntries(nil, rangeID, 21, 22, 0); len(ents) != 1 {
if ents, _, _, _ := rec.getEntries(nil, rangeID, 21, 22, noLimit); len(ents) != 1 {
t.Errorf("expected entry 22 to remain in the cache clearTo")
}
}
Expand All @@ -128,13 +131,13 @@ func TestEntryCacheEviction(t *testing.T) {
rangeID := roachpb.RangeID(1)
rec := newRaftEntryCache(100)
rec.addEntries(rangeID, []raftpb.Entry{newEntry(1, 40), newEntry(2, 40)})
ents, _, hi := rec.getEntries(nil, rangeID, 1, 3, 0)
ents, _, hi, _ := rec.getEntries(nil, rangeID, 1, 3, noLimit)
if len(ents) != 2 || hi != 3 {
t.Errorf("expected both entries; got %+v, %d", ents, hi)
}
// Add another entry to evict first.
rec.addEntries(rangeID, []raftpb.Entry{newEntry(3, 40)})
ents, _, hi = rec.getEntries(nil, rangeID, 2, 4, 0)
ents, _, hi, _ = rec.getEntries(nil, rangeID, 2, 4, noLimit)
if len(ents) != 2 || hi != 4 {
t.Errorf("expected only two entries; got %+v, %d", ents, hi)
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage
import (
"context"
"fmt"
"math"
"time"

"github.com/coreos/etcd/raft"
Expand Down Expand Up @@ -76,8 +77,7 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState,

// Entries implements the raft.Storage interface. Note that maxBytes is advisory
// and this method will always return at least one entry even if it exceeds
// maxBytes. Passing maxBytes equal to zero disables size checking. Sideloaded
// proposals count towards maxBytes with their payloads inlined.
// maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined.
func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
readonly := r.store.Engine().NewReadOnly()
defer readonly.Close()
Expand Down Expand Up @@ -114,10 +114,11 @@ func entries(
}
ents := make([]raftpb.Entry, 0, n)

ents, size, hitIndex := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)
ents, size, hitIndex, exceededMaxBytes := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)

// Return results if the correct number of results came back or if
// we ran into the max bytes limit.
if uint64(len(ents)) == hi-lo || (maxBytes > 0 && size > maxBytes) {
if uint64(len(ents)) == hi-lo || exceededMaxBytes {
return ents, nil
}

Expand All @@ -130,7 +131,6 @@ func entries(
canCache := true

var ent raftpb.Entry
exceededMaxBytes := false
scanFunc := func(kv roachpb.KeyValue) (bool, error) {
if err := kv.Value.GetProto(&ent); err != nil {
return false, err
Expand Down Expand Up @@ -158,9 +158,13 @@ func entries(

// Note that we track the size of proposals with payloads inlined.
size += uint64(ent.Size())

if size > maxBytes {
exceededMaxBytes = true
if len(ents) > 0 {
return exceededMaxBytes, nil
}
}
ents = append(ents, ent)
exceededMaxBytes = maxBytes > 0 && size > maxBytes
return exceededMaxBytes, nil
}

Expand Down Expand Up @@ -274,7 +278,7 @@ func term(
) (uint64, error) {
// entries() accepts a `nil` sideloaded storage and will skip inlining of
// sideloaded entries. We only need the term, so this is what we do.
ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, 0)
ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, math.MaxUint64 /* maxBytes */)
if err == raft.ErrCompacted {
ts, err := rsl.LoadTruncatedState(ctx, eng)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func maybeInlineSideloadedRaftCommand(
// We could unmarshal this yet again, but if it's committed we
// are very likely to have appended it recently, in which case
// we can save work.
cachedSingleton, _, _ := entryCache.getEntries(
cachedSingleton, _, _, _ := entryCache.getEntries(
nil, rangeID, ent.Index, ent.Index+1, 1<<20,
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) {
if len(entries) != 1 {
t.Fatalf("no or too many entries returned from cache: %+v", entries)
}
ents, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
ents, _, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
if withSS {
// We passed the sideload storage, so we expect to get our
// inlined index back from the cache.
Expand Down
45 changes: 33 additions & 12 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6997,8 +6997,8 @@ func TestEntries(t *testing.T) {
}},
// Case 5: Get a single entry from cache.
{lo: indexes[5], hi: indexes[6], expResultCount: 1, expCacheCount: 1, setup: nil},
// Case 6: Use MaxUint64 instead of 0 for maxBytes.
{lo: indexes[5], hi: indexes[9], maxBytes: math.MaxUint64, expResultCount: 4, expCacheCount: 4, setup: nil},
// Case 6: Get range without size limitation. (Like case 4, without truncating).
{lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 4, setup: nil},
// Case 7: maxBytes is set low so only a single value should be
// returned.
{lo: indexes[5], hi: indexes[9], maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil},
Expand Down Expand Up @@ -7044,7 +7044,10 @@ func TestEntries(t *testing.T) {
if tc.setup != nil {
tc.setup()
}
cacheEntries, _, _ := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
if tc.maxBytes == 0 {
tc.maxBytes = math.MaxUint64
}
cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
if len(cacheEntries) != tc.expCacheCount {
t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries))
}
Expand All @@ -7060,12 +7063,17 @@ func TestEntries(t *testing.T) {
}
if len(ents) != tc.expResultCount {
t.Errorf("%d: expected %d entries, got %d", i, tc.expResultCount, len(ents))
} else if tc.expResultCount > 0 {
expHitLimit := ents[len(ents)-1].Index < tc.hi-1
if hitLimit != expHitLimit {
t.Errorf("%d: unexpected hit limit: %t", i, hitLimit)
}
}
}

// Case 23: Lo must be less than or equal to hi.
repl.mu.Lock()
if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], 0); err == nil {
if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], math.MaxUint64); err == nil {
t.Errorf("23: error expected, got none")
}
repl.mu.Unlock()
Expand All @@ -7078,21 +7086,34 @@ func TestEntries(t *testing.T) {

repl.mu.Lock()
defer repl.mu.Unlock()
if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], 0); err == nil {
if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64); err == nil {
t.Errorf("24: error expected, got none")
}

// Case 25: don't hit the gap due to maxBytes.
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
if err != nil {
t.Errorf("25: expected no error, got %s", err)
// Case 25a: don't hit the gap due to maxBytes, cache populated.
{
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
if err != nil {
t.Errorf("25: expected no error, got %s", err)
}
if len(ents) != 1 {
t.Errorf("25: expected 1 entry, got %d", len(ents))
}
}
if len(ents) != 1 {
t.Errorf("25: expected 1 entry, got %d", len(ents))
// Case 25b: don't hit the gap due to maxBytes, cache cleared.
{
repl.store.raftEntryCache.delEntries(rangeID, indexes[5], indexes[5]+1)
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
if err != nil {
t.Errorf("25: expected no error, got %s", err)
}
if len(ents) != 1 {
t.Errorf("25: expected 1 entry, got %d", len(ents))
}
}

// Case 26: don't hit the gap due to truncation.
if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], 0); err != raft.ErrCompacted {
if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64); err != raft.ErrCompacted {
t.Errorf("26: expected error %s , got %s", raft.ErrCompacted, err)
}
}
Expand Down

0 comments on commit a403a48

Please sign in to comment.