Skip to content

Commit 9c103dd

Browse files
committed
*: cancel required leader streams when memeber lost its leader
1 parent 2e01105 commit 9c103dd

File tree

9 files changed

+217
-44
lines changed

9 files changed

+217
-44
lines changed

etcdserver/api/v3rpc/grpc.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server {
2929
opts = append(opts, grpc.Creds(credentials.NewTLS(tls)))
3030
}
3131
opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s)))
32-
opts = append(opts, grpc.StreamInterceptor(metricsStreamInterceptor))
32+
opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s)))
3333

3434
grpcServer := grpc.NewServer(opts...)
3535
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))

etcdserver/api/v3rpc/interceptor.go

+90
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package v3rpc
1616

1717
import (
1818
"strings"
19+
"sync"
1920
"time"
2021

2122
"github.com/coreos/etcd/etcdserver"
@@ -28,6 +29,15 @@ import (
2829
"google.golang.org/grpc/metadata"
2930
)
3031

32+
const (
33+
maxNoLeaderCnt = 3
34+
)
35+
36+
type streamsMap struct {
37+
mu sync.Mutex
38+
streams map[grpc.ServerStream]struct{}
39+
}
40+
3141
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
3242
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
3343
md, ok := metadata.FromContext(ctx)
@@ -42,6 +52,37 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
4252
}
4353
}
4454

55+
func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
56+
smap := monitorLeader(s)
57+
58+
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
59+
md, ok := metadata.FromContext(ss.Context())
60+
if ok {
61+
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
62+
if s.Leader() == types.ID(raft.None) {
63+
return rpctypes.ErrGRPCNoLeader
64+
}
65+
66+
cctx, cancel := context.WithCancel(ss.Context())
67+
ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
68+
69+
smap.mu.Lock()
70+
smap.streams[ss] = struct{}{}
71+
smap.mu.Unlock()
72+
73+
defer func() {
74+
smap.mu.Lock()
75+
delete(smap.streams, ss)
76+
smap.mu.Unlock()
77+
cancel()
78+
}()
79+
80+
}
81+
}
82+
return metricsStreamInterceptor(srv, ss, info, handler)
83+
}
84+
}
85+
4586
func metricsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
4687
service, method := splitMethodName(info.FullMethod)
4788
receivedCounter.WithLabelValues(service, method).Inc()
@@ -75,3 +116,52 @@ func splitMethodName(fullMethodName string) (string, string) {
75116
}
76117
return "unknown", "unknown"
77118
}
119+
120+
type serverStreamWithCtx struct {
121+
grpc.ServerStream
122+
ctx context.Context
123+
cancel *context.CancelFunc
124+
}
125+
126+
func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
127+
128+
func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
129+
smap := &streamsMap{
130+
streams: make(map[grpc.ServerStream]struct{}),
131+
}
132+
133+
go func() {
134+
election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
135+
noLeaderCnt := 0
136+
137+
for {
138+
select {
139+
case <-s.StopNotify():
140+
return
141+
case <-time.After(election):
142+
if s.Leader() == types.ID(raft.None) {
143+
noLeaderCnt++
144+
} else {
145+
noLeaderCnt = 0
146+
}
147+
148+
// We are more conservative on canceling existing streams. Reconnecting streams
149+
// cost much more than just rejecting new requests. So we wait until the member
150+
// cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
151+
if noLeaderCnt >= maxNoLeaderCnt {
152+
smap.mu.Lock()
153+
for ss := range smap.streams {
154+
if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
155+
(*ssWithCtx.cancel)()
156+
<-ss.Context().Done()
157+
}
158+
}
159+
smap.streams = make(map[grpc.ServerStream]struct{})
160+
smap.mu.Unlock()
161+
}
162+
}
163+
}
164+
}()
165+
166+
return smap
167+
}

etcdserver/api/v3rpc/watch.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ import (
1919
"sync"
2020
"time"
2121

22+
"golang.org/x/net/context"
23+
2224
"github.com/coreos/etcd/etcdserver"
25+
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
2326
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
2427
"github.com/coreos/etcd/mvcc"
2528
"github.com/coreos/etcd/mvcc/mvccpb"
@@ -105,10 +108,24 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error {
105108
progress: make(map[mvcc.WatchID]bool),
106109
closec: make(chan struct{}),
107110
}
108-
defer sws.close()
109111

110112
go sws.sendLoop()
111-
return sws.recvLoop()
113+
errc := make(chan error, 1)
114+
go func() {
115+
errc <- sws.recvLoop()
116+
sws.close()
117+
}()
118+
select {
119+
case err := <-errc:
120+
return err
121+
case <-stream.Context().Done():
122+
err := stream.Context().Err()
123+
// the only server-side cancellation is noleader for now.
124+
if err == context.Canceled {
125+
return rpctypes.ErrGRPCNoLeader
126+
}
127+
return err
128+
}
112129
}
113130

114131
func (sws *serverWatchStream) recvLoop() error {

etcdserver/quota.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,20 @@ const (
5050
)
5151

5252
func NewBackendQuota(s *EtcdServer) Quota {
53-
if s.cfg.QuotaBackendBytes < 0 {
53+
if s.Cfg.QuotaBackendBytes < 0 {
5454
// disable quotas if negative
5555
plog.Warningf("disabling backend quota")
5656
return &passthroughQuota{}
5757
}
58-
if s.cfg.QuotaBackendBytes == 0 {
58+
if s.Cfg.QuotaBackendBytes == 0 {
5959
// use default size if no quota size given
6060
return &backendQuota{s, backend.DefaultQuotaBytes}
6161
}
62-
if s.cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
63-
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
62+
if s.Cfg.QuotaBackendBytes > backend.MaxQuotaBytes {
63+
plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.Cfg.QuotaBackendBytes, backend.MaxQuotaBytes)
6464
return &backendQuota{s, backend.MaxQuotaBytes}
6565
}
66-
return &backendQuota{s, s.cfg.QuotaBackendBytes}
66+
return &backendQuota{s, s.Cfg.QuotaBackendBytes}
6767
}
6868

6969
func (b *backendQuota) Available(v interface{}) bool {

etcdserver/raft.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ func (r *raftNode) start(s *EtcdServer) {
134134
r.done = make(chan struct{})
135135

136136
heartbeat := 200 * time.Millisecond
137-
if s.cfg != nil {
138-
heartbeat = time.Duration(s.cfg.TickMs) * time.Millisecond
137+
if s.Cfg != nil {
138+
heartbeat = time.Duration(s.Cfg.TickMs) * time.Millisecond
139139
}
140140
// set up contention detectors for raft heartbeat message.
141141
// expect to send a heartbeat within 2 heartbeat intervals.
@@ -173,7 +173,7 @@ func (r *raftNode) start(s *EtcdServer) {
173173
// it promotes or demotes instead of modifying server directly.
174174
syncC = r.s.SyncTicker
175175
if r.s.lessor != nil {
176-
r.s.lessor.Promote(r.s.cfg.electionTimeout())
176+
r.s.lessor.Promote(r.s.Cfg.electionTimeout())
177177
}
178178
// TODO: remove the nil checking
179179
// current test utility does not provide the stats
@@ -238,7 +238,7 @@ func (r *raftNode) start(s *EtcdServer) {
238238
raftDone <- struct{}{}
239239
r.Advance()
240240
case <-syncC:
241-
r.s.sync(r.s.cfg.ReqTimeout())
241+
r.s.sync(r.s.Cfg.ReqTimeout())
242242
case <-r.stopped:
243243
return
244244
}

etcdserver/server.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,11 @@ type EtcdServer struct {
162162
// count the number of inflight snapshots.
163163
// MUST use atomic operation to access this field.
164164
inflightSnapshots int64
165+
Cfg *ServerConfig
165166

166167
readych chan struct{}
167168
r raftNode
168169

169-
cfg *ServerConfig
170170
snapCount uint64
171171

172172
w wait.Wait
@@ -369,7 +369,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
369369

370370
srv = &EtcdServer{
371371
readych: make(chan struct{}),
372-
cfg: cfg,
372+
Cfg: cfg,
373373
snapCount: cfg.SnapCount,
374374
errorc: make(chan error, 1),
375375
store: st,
@@ -444,7 +444,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
444444
// It also starts a goroutine to publish its server information.
445445
func (s *EtcdServer) Start() {
446446
s.start()
447-
go s.publish(s.cfg.ReqTimeout())
447+
go s.publish(s.Cfg.ReqTimeout())
448448
go s.purgeFile()
449449
go monitorFileDescriptor(s.done)
450450
go s.monitorVersions()
@@ -473,11 +473,11 @@ func (s *EtcdServer) start() {
473473

474474
func (s *EtcdServer) purgeFile() {
475475
var serrc, werrc <-chan error
476-
if s.cfg.MaxSnapFiles > 0 {
477-
serrc = fileutil.PurgeFile(s.cfg.SnapDir(), "snap", s.cfg.MaxSnapFiles, purgeFileInterval, s.done)
476+
if s.Cfg.MaxSnapFiles > 0 {
477+
serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
478478
}
479-
if s.cfg.MaxWALFiles > 0 {
480-
werrc = fileutil.PurgeFile(s.cfg.WALDir(), "wal", s.cfg.MaxWALFiles, purgeFileInterval, s.done)
479+
if s.Cfg.MaxWALFiles > 0 {
480+
werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
481481
}
482482
select {
483483
case e := <-werrc:
@@ -623,7 +623,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
623623
plog.Panicf("get database snapshot file path error: %v", err)
624624
}
625625

626-
fn := path.Join(s.cfg.SnapDir(), databaseFilename)
626+
fn := path.Join(s.Cfg.SnapDir(), databaseFilename)
627627
if err := os.Rename(snapfn, fn); err != nil {
628628
plog.Panicf("rename snapshot file error: %v", err)
629629
}
@@ -764,7 +764,7 @@ func (s *EtcdServer) LeaderStats() []byte {
764764
func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
765765

766766
func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error {
767-
if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() {
767+
if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() {
768768
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
769769
// In such a case adding a new member is allowed unconditionally
770770
return ErrNotEnoughStartedMembers
@@ -784,7 +784,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) erro
784784
}
785785

786786
func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error {
787-
if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) {
787+
if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) {
788788
// If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd.
789789
// In such a case removing a member is allowed unconditionally
790790
return ErrNotEnoughStartedMembers
@@ -823,7 +823,7 @@ func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) }
823823

824824
func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
825825

826-
func (s *EtcdServer) IsPprofEnabled() bool { return s.cfg.EnablePprof }
826+
func (s *EtcdServer) IsPprofEnabled() bool { return s.Cfg.EnablePprof }
827827

828828
// configure sends a configuration change through consensus and
829829
// then waits for it to be applied to the server. It
@@ -939,7 +939,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) {
939939
ok, exceed := s.r.td.Observe(ms[i].To)
940940
if !ok {
941941
// TODO: limit request rate.
942-
plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.cfg.TickMs, exceed)
942+
plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed)
943943
plog.Warningf("server is likely overloaded")
944944
}
945945
}
@@ -1221,7 +1221,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
12211221
Path: membership.StoreClusterVersionKey(),
12221222
Val: ver,
12231223
}
1224-
ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ReqTimeout())
1224+
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
12251225
_, err := s.Do(ctx, req)
12261226
cancel()
12271227
switch err {
@@ -1241,7 +1241,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
12411241
return ErrCanceled
12421242
case context.DeadlineExceeded:
12431243
curLeadElected := s.r.leadElectedTime()
1244-
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond)
1244+
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
12451245
if start.After(prevLeadLost) && start.Before(curLeadElected) {
12461246
return ErrTimeoutDueToLeaderFail
12471247
}

0 commit comments

Comments
 (0)