Skip to content

Commit 9d25cb3

Browse files
committed
client: clean up v1 balancer wrapper error handling
1 parent 4be7750 commit 9d25cb3

File tree

4 files changed

+53
-36
lines changed

4 files changed

+53
-36
lines changed

Diff for: balancer/balancer.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,10 @@ type Picker interface {
214214
// - Else (error is other non-nil error):
215215
// - The RPC will fail with unavailable error.
216216
//
217-
// The returned done() function will be called once the rpc has finished, with the
218-
// final status of that RPC.
219-
// done may be nil if balancer doesn't care about the RPC status.
217+
// The returned done() function will be called once the rpc has finished,
218+
// with the final status of that RPC. If the SubConn returned is not a
219+
// valid SubConn type, done may not be called. done may be nil if balancer
220+
// doesn't care about the RPC status.
220221
Pick(ctx context.Context, opts PickOptions) (conn SubConn, done func(DoneInfo), err error)
221222
}
222223

Diff for: balancer_v1_wrapper.go

+40-25
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,8 @@ func (bw *balancerWrapper) Close() {
281281
}
282282

283283
// The picker is the balancerWrapper itself.
284-
// Pick should never return ErrNoSubConnAvailable.
285284
// It either blocks or returns error, consistent with v1 balancer Get().
286-
func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
285+
func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions) (sc balancer.SubConn, done func(balancer.DoneInfo), err error) {
287286
failfast := true // Default failfast is true.
288287
if ss, ok := rpcInfoFromContext(ctx); ok {
289288
failfast = ss.failfast
@@ -292,35 +291,51 @@ func (bw *balancerWrapper) Pick(ctx context.Context, opts balancer.PickOptions)
292291
if err != nil {
293292
return nil, nil, err
294293
}
295-
var done func(balancer.DoneInfo)
296294
if p != nil {
297-
done = func(i balancer.DoneInfo) { p() }
295+
done = func(balancer.DoneInfo) { p() }
296+
defer func() {
297+
if err != nil {
298+
p()
299+
}
300+
}()
298301
}
299-
var sc balancer.SubConn
302+
300303
bw.mu.Lock()
301304
defer bw.mu.Unlock()
302305
if bw.pickfirst {
303306
// Get the first sc in conns.
304-
for _, sc = range bw.conns {
305-
break
306-
}
307-
} else {
308-
var ok bool
309-
sc, ok = bw.conns[resolver.Address{
310-
Addr: a.Addr,
311-
Type: resolver.Backend,
312-
ServerName: "",
313-
Metadata: a.Metadata,
314-
}]
315-
if !ok && failfast {
316-
return nil, nil, balancer.ErrTransientFailure
317-
}
318-
if s, ok := bw.connSt[sc]; failfast && (!ok || s.s != connectivity.Ready) {
319-
// If the returned sc is not ready and RPC is failfast,
320-
// return error, and this RPC will fail.
321-
return nil, nil, balancer.ErrTransientFailure
307+
for _, sc := range bw.conns {
308+
return sc, done, nil
322309
}
310+
return nil, nil, balancer.ErrNoSubConnAvailable
311+
}
312+
sc, ok1 := bw.conns[resolver.Address{
313+
Addr: a.Addr,
314+
Type: resolver.Backend,
315+
ServerName: "",
316+
Metadata: a.Metadata,
317+
}]
318+
s, ok2 := bw.connSt[sc]
319+
if !ok1 || !ok2 {
320+
// This can only happen due to a race where Get() returned an address
321+
// that was subsequently removed by Notify. In this case we should
322+
// retry always.
323+
return nil, nil, balancer.ErrNoSubConnAvailable
324+
}
325+
switch s.s {
326+
case connectivity.Ready, connectivity.Idle:
327+
return sc, done, nil
328+
case connectivity.Shutdown, connectivity.TransientFailure:
329+
// If the returned sc has been shut down or is in transient failure,
330+
// return error, and this RPC will fail or wait for another picker (if
331+
// non-failfast).
332+
return nil, nil, balancer.ErrTransientFailure
333+
default:
334+
// For other states (connecting or unknown), the v1 balancer would
335+
// traditionally wait until ready and then issue the RPC. Returning
336+
// ErrNoSubConnAvailable will be a slight improvement in that it will
337+
// allow the balancer to choose another address in case others are
338+
// connected.
339+
return nil, nil, balancer.ErrNoSubConnAvailable
323340
}
324-
325-
return sc, done, nil
326341
}

Diff for: picker_wrapper.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,7 @@ func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) f
101101
// - the subConn returned by the current picker is not READY
102102
// When one of these situations happens, pick blocks until the picker gets updated.
103103
func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
104-
var (
105-
p balancer.Picker
106-
ch chan struct{}
107-
)
104+
var ch chan struct{}
108105

109106
for {
110107
bp.mu.Lock()
@@ -130,7 +127,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
130127
}
131128

132129
ch = bp.blockingCh
133-
p = bp.picker
130+
p := bp.picker
134131
bp.mu.Unlock()
135132

136133
subConn, done, err := p.Pick(ctx, opts)
@@ -152,7 +149,7 @@ func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.
152149

153150
acw, ok := subConn.(*acBalancerWrapper)
154151
if !ok {
155-
grpclog.Infof("subconn returned from pick is not *acBalancerWrapper")
152+
grpclog.Error("subconn returned from pick is not *acBalancerWrapper")
156153
continue
157154
}
158155
if t, ok := acw.getAddrConn().getReadyTransport(); ok {

Diff for: test/end2end_test.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,9 @@ func testServerGoAway(t *testing.T, e env) {
10371037
cc := te.clientConn()
10381038
tc := testpb.NewTestServiceClient(cc)
10391039
// Finish an RPC to make sure the connection is good.
1040-
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1040+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1041+
defer cancel()
1042+
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
10411043
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
10421044
}
10431045
ch := make(chan struct{})
@@ -1055,7 +1057,9 @@ func testServerGoAway(t *testing.T, e env) {
10551057
cancel()
10561058
}
10571059
// A new RPC should fail.
1058-
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
1060+
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
1061+
defer cancel()
1062+
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
10591063
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
10601064
}
10611065
<-ch

0 commit comments

Comments
 (0)