Skip to content

Commit ff5e5dd

Browse files
committed
etcdserver: fix incorrect metrics generated when clients cancel watches
Before this patch, a client which cancels the context for a watch results in the server generating a `rpctypes.ErrGRPCNoLeader` error that leads the recording of a gRPC `Unavailable` metric in association with the client watch cancellation. The metric looks like this: grpc_server_handled_total{grpc_code="Unavailable",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} So, the watch server has misidentified the error as a server error and then propagates the mistake to metrics, leading to a false indicator that the leader has been lost. This false signal then leads to false alerting. The commit 9c103dd introduced an interceptor which wraps watch streams requiring a leader, causing those streams to be actively canceled when leader loss is detected. However, the error handling code assumes all stream context cancellations are from the interceptor. This assumption is broken when the context was canceled because of a client stream cancelation. The core challenge is lack of information conveyed via `context.Context` which is shared by both the send and receive sides of the stream handling and is subject to cancellation by all paths (including the gRPC library itself). If any piece of the system cancels the shared context, there's no way for a context consumer to understand who cancelled the context or why. To solve the ambiguity of the stream interceptor code specifically, this patch introduces a custom context struct which the interceptor uses to expose a custom error through the context when the interceptor decides to actively cancel a stream. Now the consuming side can more safely assume a generic context cancellation can be propagated as a cancellation, and the server generated leader error is preserved and propagated normally without any special inference. When a client cancels the stream, there remains a race in the error handling code between the send and receive goroutines whereby the underlying gRPC error is lost in the case where the send path returns and is handled first, but this issue can be taken separately as no matter which paths wins, we can detect a generic cancellation. This is a replacement of etcd-io#11375. Fixes etcd-io#10289, etcd-io#9725, etcd-io#9576, etcd-io#9166
1 parent c11ddc6 commit ff5e5dd

File tree

4 files changed

+68
-10
lines changed

4 files changed

+68
-10
lines changed

api/v3rpc/rpctypes/error.go

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ var (
3535
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
3636
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()
3737

38+
ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err()
39+
3840
ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
3941
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
4042
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()

server/etcdserver/api/v3rpc/interceptor.go

+49-6
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
231231
return rpctypes.ErrGRPCNoLeader
232232
}
233233

234-
cctx, cancel := context.WithCancel(ss.Context())
235-
ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
234+
ctx := newCancellableContext(ss.Context())
235+
ss = serverStreamWithCtx{ctx: newCancellableContext(ss.Context()), ServerStream: ss}
236236

237237
smap.mu.Lock()
238238
smap.streams[ss] = struct{}{}
@@ -242,7 +242,8 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
242242
smap.mu.Lock()
243243
delete(smap.streams, ss)
244244
smap.mu.Unlock()
245-
cancel()
245+
// TODO: investigate whether the reason for cancellation here is useful to know
246+
ctx.Cancel(nil)
246247
}()
247248
}
248249
}
@@ -251,10 +252,52 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
251252
}
252253
}
253254

255+
// cancellableContext wraps a context with new cancellable context that allows a
256+
// specific cancellation error to be preserved and later retrieved using the
257+
// Context.Err() function. This is so downstream context users can disambiguate
258+
// the reason for the cancellation which could be from the client (for example)
259+
// or from this interceptor code.
260+
type cancellableContext struct {
261+
context.Context
262+
263+
lock sync.RWMutex
264+
cancel context.CancelFunc
265+
cancelReason error
266+
}
267+
268+
func newCancellableContext(parent context.Context) *cancellableContext {
269+
ctx, cancel := context.WithCancel(parent)
270+
return &cancellableContext{
271+
Context: ctx,
272+
cancel: cancel,
273+
}
274+
}
275+
276+
// Cancel stores the cancellation reason and then delegates to context.WithCancel
277+
// against the parent context.
278+
func (c *cancellableContext) Cancel(reason error) {
279+
c.lock.Lock()
280+
c.cancelReason = reason
281+
c.lock.Unlock()
282+
c.cancel()
283+
}
284+
285+
// Err will return the preserved cancel reason error if present, and will
286+
// otherwise return the underlying error from the parent context.
287+
func (c *cancellableContext) Err() error {
288+
c.lock.RLock()
289+
defer c.lock.RUnlock()
290+
if c.cancelReason != nil {
291+
return c.cancelReason
292+
}
293+
return c.Context.Err()
294+
}
295+
254296
type serverStreamWithCtx struct {
255297
grpc.ServerStream
256-
ctx context.Context
257-
cancel *context.CancelFunc
298+
299+
// ctx is used so that we can preserve a reason for cancellation.
300+
ctx *cancellableContext
258301
}
259302

260303
func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
@@ -286,7 +329,7 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
286329
smap.mu.Lock()
287330
for ss := range smap.streams {
288331
if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
289-
(*ssWithCtx.cancel)()
332+
ssWithCtx.ctx.Cancel(rpctypes.ErrGRPCNoLeader)
290333
<-ss.Context().Done()
291334
}
292335
}

server/etcdserver/api/v3rpc/watch.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -197,15 +197,25 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
197197
}
198198
}()
199199

200+
// TODO: There's a race here. When a stream is closed (e.g. due to a cancellation),
201+
// the underlying error (e.g. a gRPC stream error) may be returned and handled
202+
// through errc if the recv goroutine finishes before the send goroutine.
203+
// When the recv goroutine wins, the stream error is retained. When recv loses
204+
// the race, the underlying error is lost (unless the root error is propagated
205+
// through Context.Err() which is not always the case (as callers have to decide
206+
// to implement a custom context to do so). The stdlib context package builtins
207+
// may be insufficient to carry semantically useful errors around and should be
208+
// revisited.
200209
select {
201210
case err = <-errc:
211+
if err == context.Canceled {
212+
err = rpctypes.ErrGRPCWatchCanceled
213+
}
202214
close(sws.ctrlStream)
203-
204215
case <-stream.Context().Done():
205216
err = stream.Context().Err()
206-
// the only server-side cancellation is noleader for now.
207217
if err == context.Canceled {
208-
err = rpctypes.ErrGRPCNoLeader
218+
err = rpctypes.ErrGRPCWatchCanceled
209219
}
210220
}
211221

tests/e2e/metrics_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) {
4949
{"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")},
5050
{"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)},
5151
{"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))},
52+
{"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)},
5253
{"/health", `{"health":"true","reason":""}`},
5354
} {
5455
i++
@@ -58,7 +59,9 @@ func metricsTest(cx ctlCtx) {
5859
if err := ctlV3Del(cx, []string{fmt.Sprintf("%d", i)}, 1); err != nil {
5960
cx.t.Fatal(err)
6061
}
61-
62+
if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
63+
cx.t.Fatal(err)
64+
}
6265
if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
6366
cx.t.Fatalf("failed get with curl (%v)", err)
6467
}

0 commit comments

Comments
 (0)