diff --git a/auth/store.go b/auth/store.go index 5672a9440e1..46f58645151 100644 --- a/auth/store.go +++ b/auth/store.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/auth/authpb" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/etcdserver/cindex" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc/backend" @@ -91,9 +92,6 @@ type AuthenticateParamIndex struct{} // AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate() type AuthenticateParamSimpleTokenPrefix struct{} -// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex -type saveConsistentIndexFunc func(tx backend.BatchTx) - // AuthStore defines auth storage interface. type AuthStore interface { // AuthEnable turns on the authentication feature @@ -186,9 +184,6 @@ type AuthStore interface { // HasRole checks that user has role HasRole(user, role string) bool - - // SetConsistentIndexSyncer sets consistentIndex syncer - SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) } type TokenProvider interface { @@ -212,14 +207,11 @@ type authStore struct { rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions - tokenProvider TokenProvider - syncConsistentIndex saveConsistentIndexFunc - bcryptCost int // the algorithm cost / strength for hashing auth passwords + tokenProvider TokenProvider + bcryptCost int // the algorithm cost / strength for hashing auth passwords + ci cindex.ConsistentIndexer } -func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) { - as.syncConsistentIndex = syncer -} func (as *authStore) AuthEnable() error { as.enabledMu.Lock() defer as.enabledMu.Unlock() @@ -1018,7 +1010,7 @@ func (as *authStore) IsAuthEnabled() bool { } // NewAuthStore creates a new AuthStore. -func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCost int) *authStore { +func NewAuthStore(lg *zap.Logger, be backend.Backend, ci cindex.ConsistentIndexer, tp TokenProvider, bcryptCost int) *authStore { if lg == nil { lg = zap.NewNop() } @@ -1053,6 +1045,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo revision: getRevision(tx), lg: lg, be: be, + ci: ci, enabled: enabled, rangePermCache: make(map[string]*unifiedRangePermissions), tokenProvider: tp, @@ -1314,10 +1307,10 @@ func (as *authStore) BcryptCost() int { } func (as *authStore) saveConsistentIndex(tx backend.BatchTx) { - if as.syncConsistentIndex != nil { - as.syncConsistentIndex(tx) + if as.ci != nil { + as.ci.UnsafeSave(tx) } else { - as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil") + as.lg.Error("failed to save consistentIndex,consistentIndexer is nil") } } diff --git a/auth/store_test.go b/auth/store_test.go index d523b190504..66cdeb2a61c 100644 --- a/auth/store_test.go +++ b/auth/store_test.go @@ -52,7 +52,7 @@ func TestNewAuthStoreRevision(t *testing.T) { if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) err = enableAuthAndCreateRoot(as) if err != nil { t.Fatal(err) @@ -63,7 +63,7 @@ func TestNewAuthStoreRevision(t *testing.T) { // no changes to commit b2 := backend.NewDefaultBackend(tPath) - as = NewAuthStore(zap.NewExample(), b2, tp, bcrypt.MinCost) + as = NewAuthStore(zap.NewExample(), b2, nil, tp, bcrypt.MinCost) new := as.Revision() as.Close() b2.Close() @@ -85,7 +85,7 @@ func TestNewAuthStoreBcryptCost(t *testing.T) { invalidCosts := [2]int{bcrypt.MinCost - 1, bcrypt.MaxCost + 1} for _, invalidCost := range invalidCosts { - as := NewAuthStore(zap.NewExample(), b, tp, invalidCost) + as := NewAuthStore(zap.NewExample(), b, nil, tp, invalidCost) if as.BcryptCost() != bcrypt.DefaultCost { t.Fatalf("expected DefaultCost when bcryptcost is invalid") } @@ -102,7 +102,7 @@ func setupAuthStore(t *testing.T) (store *authStore, teardownfunc func(t *testin if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) err = enableAuthAndCreateRoot(as) if err != nil { t.Fatal(err) @@ -703,7 +703,7 @@ func TestAuthInfoFromCtxRace(t *testing.T) { if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) defer as.Close() donec := make(chan struct{}) @@ -769,7 +769,7 @@ func TestRecoverFromSnapshot(t *testing.T) { if err != nil { t.Fatal(err) } - as2 := NewAuthStore(zap.NewExample(), as.be, tp, bcrypt.MinCost) + as2 := NewAuthStore(zap.NewExample(), as.be, nil, tp, bcrypt.MinCost) defer func(a *authStore) { a.Close() }(as2) @@ -851,7 +851,7 @@ func TestRolesOrder(t *testing.T) { if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) err = enableAuthAndCreateRoot(as) if err != nil { t.Fatal(err) @@ -906,7 +906,7 @@ func testAuthInfoFromCtxWithRoot(t *testing.T, opts string) { if err != nil { t.Fatal(err) } - as := NewAuthStore(zap.NewExample(), b, tp, bcrypt.MinCost) + as := NewAuthStore(zap.NewExample(), b, nil, tp, bcrypt.MinCost) defer as.Close() if err = enableAuthAndCreateRoot(as); err != nil { diff --git a/clientv3/snapshot/util.go b/clientv3/snapshot/util.go index 93ba70b6c61..2c1fae21fa1 100644 --- a/clientv3/snapshot/util.go +++ b/clientv3/snapshot/util.go @@ -14,7 +14,9 @@ package snapshot -import "encoding/binary" +import ( + "encoding/binary" +) type revision struct { main int64 @@ -27,9 +29,3 @@ func bytesToRev(bytes []byte) revision { sub: int64(binary.BigEndian.Uint64(bytes[9:])), } } - -// initIndex implements ConsistentIndexGetter so the snapshot won't block -// the new raft instance by waiting for a future raft index. -type initIndex int - -func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) } diff --git a/clientv3/snapshot/v3_snapshot.go b/clientv3/snapshot/v3_snapshot.go index 791035e7db8..5eefdbaaca0 100644 --- a/clientv3/snapshot/v3_snapshot.go +++ b/clientv3/snapshot/v3_snapshot.go @@ -34,6 +34,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/snap" "go.etcd.io/etcd/etcdserver/api/v2store" + "go.etcd.io/etcd/etcdserver/cindex" "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" @@ -384,7 +385,9 @@ func (s *v3Manager) saveDB() error { // a lessor never timeouts leases lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}) - mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) + ci := cindex.NewConsistentIndex(be.BatchTx()) + ci.SetConsistentIndex(uint64(commit)) + mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) txn := mvs.Write(traceutil.TODO()) btx := be.BatchTx() del := func(k, v []byte) error { diff --git a/etcdserver/backend.go b/etcdserver/backend.go index 99b06a82596..a60a9d19624 100644 --- a/etcdserver/backend.go +++ b/etcdserver/backend.go @@ -20,6 +20,7 @@ import ( "time" "go.etcd.io/etcd/etcdserver/api/snap" + "go.etcd.io/etcd/etcdserver/cindex" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/backend" @@ -94,8 +95,8 @@ func openBackend(cfg ServerConfig) backend.Backend { // violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this // case, replace the db with the snapshot db sent by the leader. func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) { - var cIndex consistentIndex - kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + ci := cindex.NewConsistentIndex(oldbe.BatchTx()) + kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, ci, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) defer kv.Close() if snapshot.Metadata.Index <= kv.ConsistentIndex() { return oldbe, nil diff --git a/etcdserver/cindex/cindex.go b/etcdserver/cindex/cindex.go new file mode 100644 index 00000000000..6a83326aedf --- /dev/null +++ b/etcdserver/cindex/cindex.go @@ -0,0 +1,114 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cindex + +import ( + "encoding/binary" + "sync" + "sync/atomic" + + "go.etcd.io/etcd/mvcc/backend" +) + +var ( + metaBucketName = []byte("meta") + + consistentIndexKeyName = []byte("consistent_index") +) + +// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex. +type ConsistentIndexer interface { + + // ConsistentIndex returns the consistent index of current executing entry. + ConsistentIndex() uint64 + + // SetConsistentIndex set the consistent index of current executing entry. + SetConsistentIndex(v uint64) + + // UnsafeSave must be called holding the lock on the tx. + // It saves consistentIndex to the underlying stable storage. + UnsafeSave(tx backend.BatchTx) + + // SetBatchTx set the available backend.BatchTx for ConsistentIndexer. + SetBatchTx(tx backend.BatchTx) +} + +// consistentIndex implements the ConsistentIndexer interface. +type consistentIndex struct { + tx backend.BatchTx + // consistentIndex represents the offset of an entry in a consistent replica log. + // it caches the "consistent_index" key's value. Accessed + // through atomics so must be 64-bit aligned. + consistentIndex uint64 + // bytesBuf8 is a byte slice of length 8 + // to avoid a repetitive allocation in saveIndex. + bytesBuf8 []byte + mutex sync.Mutex +} + +func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer { + return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)} +} + +func (ci *consistentIndex) ConsistentIndex() uint64 { + + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + ci.mutex.Lock() + defer ci.mutex.Unlock() + ci.tx.Lock() + defer ci.tx.Unlock() + _, vs := ci.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) + if len(vs) == 0 { + return 0 + } + v := binary.BigEndian.Uint64(vs[0]) + atomic.StoreUint64(&ci.consistentIndex, v) + return v +} + +func (ci *consistentIndex) SetConsistentIndex(v uint64) { + atomic.StoreUint64(&ci.consistentIndex, v) +} + +func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { + bs := ci.bytesBuf8 + binary.BigEndian.PutUint64(bs, ci.consistentIndex) + // put the index into the underlying backend + // tx has been locked in TxnBegin, so there is no need to lock it again + tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) +} + +func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) { + ci.mutex.Lock() + defer ci.mutex.Unlock() + ci.tx = tx +} + +func NewFakeConsistentIndex(index uint64) ConsistentIndexer { + return &fakeConsistentIndex{index: index} +} + +type fakeConsistentIndex struct{ index uint64 } + +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } + +func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) { + atomic.StoreUint64(&f.index, index) +} + +func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {} +func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {} diff --git a/etcdserver/cindex/cindex_test.go b/etcdserver/cindex/cindex_test.go new file mode 100644 index 00000000000..9d458157366 --- /dev/null +++ b/etcdserver/cindex/cindex_test.go @@ -0,0 +1,85 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cindex + +import ( + "math/rand" + "os" + "testing" + "time" + + "go.etcd.io/etcd/mvcc/backend" +) + +// TestConsistentIndex ensures that LoadConsistentIndex/Save/ConsistentIndex and backend.BatchTx can work well together. +func TestConsistentIndex(t *testing.T) { + + be, tmpPath := backend.NewTmpBackend(time.Microsecond, 10) + defer os.Remove(tmpPath) + ci := NewConsistentIndex(be.BatchTx()) + + tx := be.BatchTx() + if tx == nil { + t.Fatal("batch tx is nil") + } + tx.Lock() + tx.UnsafeCreateBucket(metaBucketName) + tx.Unlock() + be.ForceCommit() + r := rand.Uint64() + ci.SetConsistentIndex(r) + index := ci.ConsistentIndex() + if index != r { + t.Errorf("expected %d,got %d", r, index) + } + tx.Lock() + ci.UnsafeSave(tx) + tx.Unlock() + be.ForceCommit() + be.Close() + + b := backend.NewDefaultBackend(tmpPath) + ci.SetConsistentIndex(0) + ci.SetBatchTx(b.BatchTx()) + index = ci.ConsistentIndex() + if index != r { + t.Errorf("expected %d,got %d", r, index) + } + + ci = NewConsistentIndex(b.BatchTx()) + index = ci.ConsistentIndex() + if index != r { + t.Errorf("expected %d,got %d", r, index) + } + b.Close() + +} + +func TestFakeConsistentIndex(t *testing.T) { + + r := rand.Uint64() + ci := NewFakeConsistentIndex(r) + index := ci.ConsistentIndex() + if index != r { + t.Errorf("expected %d,got %d", r, index) + } + r = rand.Uint64() + ci.SetConsistentIndex(r) + index = ci.ConsistentIndex() + if index != r { + t.Errorf("expected %d,got %d", r, index) + } + +} diff --git a/etcdserver/consistent_index_test.go b/etcdserver/cindex/doc.go similarity index 69% rename from etcdserver/consistent_index_test.go rename to etcdserver/cindex/doc.go index 0d74fca8a75..7d3e4b774e5 100644 --- a/etcdserver/consistent_index_test.go +++ b/etcdserver/cindex/doc.go @@ -1,4 +1,4 @@ -// Copyright 2015 The etcd Authors +// Copyright 2016 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,14 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -package etcdserver - -import "testing" - -func TestConsistentIndex(t *testing.T) { - var i consistentIndex - i.setConsistentIndex(10) - if g := i.ConsistentIndex(); g != 10 { - t.Errorf("value = %d, want 10", g) - } -} +// Package cindex provides an interface and implementation for getting/saving consistentIndex. +package cindex diff --git a/etcdserver/consistent_index.go b/etcdserver/consistent_index.go deleted file mode 100644 index d513f6708d3..00000000000 --- a/etcdserver/consistent_index.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2015 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcdserver - -import ( - "sync/atomic" -) - -// consistentIndex represents the offset of an entry in a consistent replica log. -// It implements the mvcc.ConsistentIndexGetter interface. -// It is always set to the offset of current entry before executing the entry, -// so ConsistentWatchableKV could get the consistent index from it. -type consistentIndex uint64 - -func (i *consistentIndex) setConsistentIndex(v uint64) { - atomic.StoreUint64((*uint64)(i), v) -} - -func (i *consistentIndex) ConsistentIndex() uint64 { - return atomic.LoadUint64((*uint64)(i)) -} diff --git a/etcdserver/server.go b/etcdserver/server.go index a4fe18c14ae..1c978323271 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -41,6 +41,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/v2store" "go.etcd.io/etcd/etcdserver/api/v3alarm" "go.etcd.io/etcd/etcdserver/api/v3compactor" + "go.etcd.io/etcd/etcdserver/cindex" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/lease/leasehttp" @@ -191,10 +192,8 @@ type EtcdServer struct { term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. - // consistIndex used to hold the offset of current executing entry - // It is initialized to 0 before executing any entry. - consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned. - r raftNode // uses 64-bit atomics; keep 64-bit aligned. + consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex + r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg ServerConfig @@ -496,6 +495,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), forceVersionC: make(chan struct{}), AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, + consistIndex: cindex.NewConsistentIndex(be.BatchTx()), } serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) @@ -524,11 +524,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { cfg.Logger.Warn("failed to create token provider", zap.Error(err)) return nil, err } - srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost)) - - srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + kvindex := srv.consistIndex.ConsistentIndex() + srv.lg.Debug("restore consistentIndex", + zap.Uint64("index", kvindex)) if beExist { - kvindex := srv.kv.ConsistentIndex() // TODO: remove kvindex != 0 checking when we do not expect users to upgrade // etcd from pre-3.0 release. if snapshot != nil && kvindex < snapshot.Metadata.Index { @@ -541,6 +541,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { ) } } + + srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, srv.consistIndex, tp, int(cfg.BcryptCost)) + newSrv := srv // since srv == nil in defer if srv is returned as nil defer func() { // closing backend without first closing kv can cause @@ -549,9 +552,6 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { newSrv.kv.Close() } }() - - srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) - if num := cfg.AutoCompactionRetention; num != 0 { srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv) if err != nil { @@ -1095,7 +1095,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Panic("failed to restore mvcc store", zap.Error(err)) } - s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex()) + s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex()) lg.Info("restored mvcc store") // Closing old backend might block until all the txns @@ -1938,7 +1938,7 @@ func (s *EtcdServer) apply( case raftpb.EntryConfChange: // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.setConsistentIndex(e.Index) + s.consistIndex.SetConsistentIndex(e.Index) } var cc raftpb.ConfChange pbutil.MustUnmarshal(&cc, e.Data) @@ -1963,11 +1963,16 @@ func (s *EtcdServer) apply( // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := false - if e.Index > s.consistIndex.ConsistentIndex() { + index := s.consistIndex.ConsistentIndex() + if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.setConsistentIndex(e.Index) + s.consistIndex.SetConsistentIndex(e.Index) shouldApplyV3 = true } + s.lg.Debug("apply entry normal", + zap.Uint64("consistent-index", index), + zap.Uint64("entry-index", e.Index), + zap.Bool("should-applyV3", shouldApplyV3)) // raft state machine may generate noop entry when leader confirmation. // skip it in advance to avoid some potential bug in the future @@ -1997,7 +2002,6 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { s.w.Trigger(req.ID, s.applyV2Request(req)) return } - // do not re-apply applied entries. if !shouldApplyV3 { return diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 3a5f4fdd84a..4b30b098b05 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -31,6 +31,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/rafthttp" "go.etcd.io/etcd/etcdserver/api/snap" "go.etcd.io/etcd/etcdserver/api/v2store" + "go.etcd.io/etcd/etcdserver/cindex" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" @@ -184,13 +185,14 @@ func TestApplyRepeat(t *testing.T) { transport: newNopTransporter(), }) s := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - r: *r, - v2store: st, - cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + r: *r, + v2store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewFakeConsistentIndex(0), } s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} s.start() @@ -640,12 +642,13 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { transport: newNopTransporter(), }) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - id: 1, - r: *r, - cluster: cl, - w: wait.New(), + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + id: 1, + r: *r, + cluster: cl, + w: wait.New(), + consistIndex: cindex.NewFakeConsistentIndex(0), } // create EntryConfChange entry @@ -688,12 +691,13 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { transport: newNopTransporter(), }) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - id: 2, - r: *r, - cluster: cl, - w: wait.New(), + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + id: 2, + r: *r, + cluster: cl, + w: wait.New(), + consistIndex: cindex.NewFakeConsistentIndex(0), } ents := []raftpb.Entry{} for i := 1; i <= 4; i++ { @@ -732,13 +736,14 @@ func TestDoProposal(t *testing.T) { transport: newNopTransporter(), }) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + r: *r, + v2store: st, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewFakeConsistentIndex(0), } srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} srv.start() @@ -978,12 +983,13 @@ func TestSnapshot(t *testing.T) { storage: p, }) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - r: *r, - v2store: st, + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + r: *r, + v2store: st, + consistIndex: cindex.NewConsistentIndex(be.BatchTx()), } - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, srv.consistIndex, mvcc.StoreConfig{}) srv.be = be ch := make(chan struct{}, 2) @@ -1050,21 +1056,22 @@ func TestSnapshotOrdering(t *testing.T) { storage: p, raftStorage: rs, }) + be, tmpPath := backend.NewDefaultTmpBackend() + defer os.RemoveAll(tmpPath) s := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - v2store: st, - snapshotter: snap.New(zap.NewExample(), snapdir), - cluster: cl, - SyncTicker: &time.Ticker{}, + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + r: *r, + v2store: st, + snapshotter: snap.New(zap.NewExample(), snapdir), + cluster: cl, + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewConsistentIndex(be.BatchTx()), } s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} - be, tmpPath := backend.NewDefaultTmpBackend() - defer os.RemoveAll(tmpPath) - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{}) + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, s.consistIndex, mvcc.StoreConfig{}) s.be = be s.start() @@ -1115,17 +1122,18 @@ func TestTriggerSnap(t *testing.T) { transport: newNopTransporter(), }) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - v2store: st, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + Cfg: ServerConfig{Logger: zap.NewExample(), TickMs: 1, SnapshotCount: uint64(snapc), SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + r: *r, + v2store: st, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewConsistentIndex(be.BatchTx()), } srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, srv.consistIndex, mvcc.StoreConfig{}) srv.be = be srv.start() @@ -1181,23 +1189,24 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { storage: mockstorage.NewStorageRecorder(testdir), raftStorage: rs, }) - s := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, - r: *r, - v2store: st, - snapshotter: snap.New(zap.NewExample(), testdir), - cluster: cl, - SyncTicker: &time.Ticker{}, - } - s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} - be, tmpPath := backend.NewDefaultTmpBackend() defer func() { os.RemoveAll(tmpPath) }() - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{}) + s := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + Cfg: ServerConfig{Logger: zap.NewExample(), DataDir: testdir, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, + r: *r, + v2store: st, + snapshotter: snap.New(zap.NewExample(), testdir), + cluster: cl, + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + } + s.applyV2 = &applierV2store{store: s.v2store, cluster: s.cluster} + + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, s.consistIndex, mvcc.StoreConfig{}) s.be = be s.start() @@ -1269,13 +1278,14 @@ func TestAddMember(t *testing.T) { transport: newNopTransporter(), }) s := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - r: *r, - v2store: st, - cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + r: *r, + v2store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewFakeConsistentIndex(0), } s.start() m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}} @@ -1313,13 +1323,14 @@ func TestRemoveMember(t *testing.T) { transport: newNopTransporter(), }) s := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - r: *r, - v2store: st, - cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + r: *r, + v2store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewFakeConsistentIndex(0), } s.start() _, err := s.RemoveMember(context.TODO(), 1234) @@ -1356,13 +1367,14 @@ func TestUpdateMember(t *testing.T) { transport: newNopTransporter(), }) s := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zap.NewExample(), - r: *r, - v2store: st, - cluster: cl, - reqIDGen: idutil.NewGenerator(0, time.Time{}), - SyncTicker: &time.Ticker{}, + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + r: *r, + v2store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewFakeConsistentIndex(0), } s.start() wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} diff --git a/integration/v3_alarm_test.go b/integration/v3_alarm_test.go index 0b2dd05ceec..ee02531d4b2 100644 --- a/integration/v3_alarm_test.go +++ b/integration/v3_alarm_test.go @@ -23,6 +23,7 @@ import ( "time" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/etcdserver/cindex" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/backend" @@ -145,10 +146,6 @@ func TestV3AlarmDeactivate(t *testing.T) { } } -type fakeConsistentIndex struct{ rev uint64 } - -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.rev } - func TestV3CorruptAlarm(t *testing.T) { defer testutil.AfterTest(t) clus := NewClusterV3(t, &ClusterConfig{Size: 3}) @@ -170,7 +167,7 @@ func TestV3CorruptAlarm(t *testing.T) { clus.Members[0].Stop(t) fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db") be := backend.NewDefaultBackend(fp) - s := mvcc.NewStore(zap.NewExample(), be, nil, &fakeConsistentIndex{13}, mvcc.StoreConfig{}) + s := mvcc.NewStore(zap.NewExample(), be, nil, cindex.NewFakeConsistentIndex(13), mvcc.StoreConfig{}) // NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'. s.Put([]byte("abc"), []byte("def"), 0) s.Put([]byte("xyz"), []byte("123"), 0) diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 06f82636b63..466040790ff 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -712,7 +712,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index 7d3a54e3359..79b7c686c8f 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -16,15 +16,14 @@ package mvcc import ( "context" - "encoding/binary" "errors" "fmt" "hash/crc32" "math" "sync" - "sync/atomic" "time" + "go.etcd.io/etcd/etcdserver/cindex" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" @@ -59,13 +58,6 @@ const ( var restoreChunkKeys = 10000 // non-const for testing var defaultCompactBatchLimit = 1000 -// ConsistentIndexGetter is an interface that wraps the Get method. -// Consistent index is the offset of an entry in a consistent replicated log. -type ConsistentIndexGetter interface { - // ConsistentIndex returns the consistent index of current executing entry. - ConsistentIndex() uint64 -} - type StoreConfig struct { CompactionBatchLimit int } @@ -74,16 +66,12 @@ type store struct { ReadView WriteView - // consistentIndex caches the "consistent_index" key's value. Accessed - // through atomics so must be 64-bit aligned. - consistentIndex uint64 - cfg StoreConfig // mu read locks for txns and write locks for non-txn store changes. mu sync.RWMutex - ig ConsistentIndexGetter + ci cindex.ConsistentIndexer b backend.Backend kvindex index @@ -99,10 +87,6 @@ type store struct { // compactMainRev is the main revision of the last compaction. compactMainRev int64 - // bytesBuf8 is a byte slice of length 8 - // to avoid a repetitive allocation in saveIndex. - bytesBuf8 []byte - fifoSched schedule.Scheduler stopc chan struct{} @@ -112,7 +96,7 @@ type store struct { // NewStore returns a new store. It is useful to create a store inside // mvcc pkg. It should only be used for testing externally. -func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *store { +func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store { if lg == nil { lg = zap.NewNop() } @@ -122,7 +106,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI s := &store{ cfg: cfg, b: b, - ig: ig, + ci: ci, kvindex: newTreeIndex(lg), le: le, @@ -130,7 +114,6 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI currentRev: 1, compactMainRev: -1, - bytesBuf8: make([]byte, 8), fifoSched: schedule.NewFIFOScheduler(), stopc: make(chan struct{}), @@ -344,13 +327,14 @@ func (s *store) Restore(b backend.Backend) error { close(s.stopc) s.fifoSched.Stop() - atomic.StoreUint64(&s.consistentIndex, 0) s.b = b s.kvindex = newTreeIndex(s.lg) s.currentRev = 1 s.compactMainRev = -1 s.fifoSched = schedule.NewFIFOScheduler() s.stopc = make(chan struct{}) + s.ci.SetBatchTx(b.BatchTx()) + s.ci.SetConsistentIndex(0) return s.restore() } @@ -529,32 +513,16 @@ func (s *store) Close() error { } func (s *store) saveIndex(tx backend.BatchTx) { - if s.ig == nil { - return + if s.ci != nil { + s.ci.UnsafeSave(tx) } - bs := s.bytesBuf8 - ci := s.ig.ConsistentIndex() - binary.BigEndian.PutUint64(bs, ci) - // put the index into the underlying backend - // tx has been locked in TxnBegin, so there is no need to lock it again - tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) - atomic.StoreUint64(&s.consistentIndex, ci) } func (s *store) ConsistentIndex() uint64 { - if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 { - return ci - } - tx := s.b.BatchTx() - tx.Lock() - defer tx.Unlock() - _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) - if len(vs) == 0 { - return 0 + if s.ci != nil { + return s.ci.ConsistentIndex() } - v := binary.BigEndian.Uint64(vs[0]) - atomic.StoreUint64(&s.consistentIndex, v) - return v + return 0 } func (s *store) setupMetricsReporter() { diff --git a/mvcc/kvstore_bench_test.go b/mvcc/kvstore_bench_test.go index e6a4af84090..e8777c14a9a 100644 --- a/mvcc/kvstore_bench_test.go +++ b/mvcc/kvstore_bench_test.go @@ -15,9 +15,9 @@ package mvcc import ( - "sync/atomic" "testing" + "go.etcd.io/etcd/etcdserver/cindex" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/pkg/traceutil" @@ -25,16 +25,9 @@ import ( "go.uber.org/zap" ) -type fakeConsistentIndex uint64 - -func (i *fakeConsistentIndex) ConsistentIndex() uint64 { - return atomic.LoadUint64((*uint64)(i)) -} - func BenchmarkStorePut(b *testing.B) { - var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -52,9 +45,8 @@ func BenchmarkStoreRangeKey1(b *testing.B) { benchmarkStoreRange(b, 1) } func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) } func benchmarkStoreRange(b *testing.B, n int) { - var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) defer cleanup(s, be, tmpPath) // 64 byte key/val @@ -80,9 +72,8 @@ func benchmarkStoreRange(b *testing.B, n int) { } func BenchmarkConsistentIndex(b *testing.B) { - fci := fakeConsistentIndex(10) be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &fci, StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) defer cleanup(s, be, tmpPath) tx := s.b.BatchTx() @@ -99,9 +90,8 @@ func BenchmarkConsistentIndex(b *testing.B) { // BenchmarkStoreTxnPutUpdate is same as above, but instead updates single key func BenchmarkStorePutUpdate(b *testing.B) { - var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -118,9 +108,8 @@ func BenchmarkStorePutUpdate(b *testing.B) { // with transaction begin and end, where transaction involves // some synchronization operations, such as mutex locking. func BenchmarkStoreTxnPut(b *testing.B) { - var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -139,9 +128,8 @@ func BenchmarkStoreTxnPut(b *testing.B) { // benchmarkStoreRestore benchmarks the restore operation func benchmarkStoreRestore(revsPerKey int, b *testing.B) { - var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) + s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) // use closure to capture 's' to pick up the reassignment defer func() { cleanup(s, be, tmpPath) }() @@ -161,7 +149,7 @@ func benchmarkStoreRestore(revsPerKey int, b *testing.B) { b.ReportAllocs() b.ResetTimer() - s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) + s = NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) } func BenchmarkStoreRestoreRevs1(b *testing.B) { diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 0529993c658..6a73c5ee024 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -15,14 +15,15 @@ package mvcc import ( - "go.etcd.io/etcd/auth" "sync" "time" + "go.etcd.io/etcd/etcdserver/cindex" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/traceutil" + "go.uber.org/zap" ) @@ -70,16 +71,16 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV { - return newWatchableStore(lg, b, le, as, ig, cfg) +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) ConsistentWatchableKV { + return newWatchableStore(lg, b, le, ci, cfg) } -func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore { +func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *watchableStore { if lg == nil { lg = zap.NewNop() } s := &watchableStore{ - store: NewStore(lg, b, le, ig, cfg), + store: NewStore(lg, b, le, ci, cfg), victimc: make(chan struct{}, 1), unsynced: newWatcherGroup(), synced: newWatcherGroup(), @@ -91,10 +92,6 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as au // use this store as the deleter so revokes trigger watch events s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } - if as != nil { - // TODO: encapsulating consistentindex into a separate package - as.SetConsistentIndexSyncer(s.store.saveIndex) - } s.wg.Add(2) go s.syncWatchersLoop() go s.syncVictimsLoop() diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 1b0581b9820..60455166c53 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -19,6 +19,7 @@ import ( "os" "testing" + "go.etcd.io/etcd/etcdserver/cindex" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/pkg/traceutil" @@ -28,7 +29,7 @@ import ( func BenchmarkWatchableStorePut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -47,9 +48,8 @@ func BenchmarkWatchableStorePut(b *testing.B) { // with transaction begin and end, where transaction involves // some synchronization operations, such as mutex locking. func BenchmarkWatchableStoreTxnPut(b *testing.B) { - var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{}) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -80,7 +80,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) { func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(s, be, tmpPath) k := []byte("testkey") @@ -180,7 +180,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 67d0611f2cf..b0623e251e9 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "go.etcd.io/etcd/etcdserver/cindex" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" @@ -32,7 +33,7 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -54,7 +55,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -224,7 +225,7 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -261,7 +262,7 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -302,7 +303,7 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, cindex.NewConsistentIndex(b.BatchTx()), StoreConfig{}) defer cleanup(s, b, tmpPath) testKey := []byte("foo") @@ -310,7 +311,7 @@ func TestWatchRestore(t *testing.T) { rev := s.Put(testKey, testValue, lease.NoLease) newBackend, newPath := backend.NewDefaultTmpBackend() - newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, cindex.NewConsistentIndex(newBackend.BatchTx()), StoreConfig{}) defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() @@ -348,11 +349,11 @@ func TestWatchRestore(t *testing.T) { // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { b1, b1Path := backend.NewDefaultTmpBackend() - s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, cindex.NewConsistentIndex(b1.BatchTx()), StoreConfig{}) defer cleanup(s1, b1, b1Path) b2, b2Path := backend.NewDefaultTmpBackend() - s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, cindex.NewConsistentIndex(b2.BatchTx()), StoreConfig{}) defer cleanup(s2, b2, b2Path) testKey, testValue := []byte("foo"), []byte("bar") @@ -399,7 +400,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) oldMaxRevs := watchBatchMaxRevs defer func() { @@ -533,7 +534,7 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -611,7 +612,7 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() diff --git a/mvcc/watcher_bench_test.go b/mvcc/watcher_bench_test.go index f2e6c6f6e96..901a1ec0d83 100644 --- a/mvcc/watcher_bench_test.go +++ b/mvcc/watcher_bench_test.go @@ -26,7 +26,7 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) defer cleanup(watchable, be, tmpPath) diff --git a/mvcc/watcher_test.go b/mvcc/watcher_test.go index b350af99b97..679ab441d19 100644 --- a/mvcc/watcher_test.go +++ b/mvcc/watcher_test.go @@ -32,7 +32,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -82,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) { func TestWatcherRequestsCustomID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -119,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -193,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -213,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) defer func() { s.store.Close() @@ -252,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -344,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream()