Skip to content

Commit 9ecb611

Browse files
authored
grpclb: drop only when at least one SubConn is ready (#2630)
1 parent 5878d96 commit 9ecb611

File tree

2 files changed

+45
-35
lines changed

2 files changed

+45
-35
lines changed

Diff for: balancer/grpclb/grpclb.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,14 @@ func (lb *lbBalancer) regeneratePicker() {
258258
}
259259
}
260260

261+
if len(readySCs) <= 0 {
262+
// If there's no ready SubConns, always re-pick. This is to avoid drops
263+
// unless at least one SubConn is ready. Otherwise we may drop more
264+
// often than want because of drops + re-picks(which become re-drops).
265+
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
266+
return
267+
}
261268
if len(lb.fullServerList) <= 0 {
262-
if len(readySCs) <= 0 {
263-
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
264-
return
265-
}
266269
lb.picker = &rrPicker{subConns: readySCs}
267270
return
268271
}

Diff for: balancer/grpclb/grpclb_test.go

+38-31
Original file line numberDiff line numberDiff line change
@@ -744,14 +744,18 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp
744744
t.Fatalf("failed to create new load balancer: %v", err)
745745
}
746746
defer cleanup()
747-
tss.ls.sls <- &lbpb.ServerList{
748-
Servers: []*lbpb.Server{{
749-
IpAddress: tss.beIPs[0],
750-
Port: int32(tss.bePorts[0]),
747+
servers := []*lbpb.Server{{
748+
IpAddress: tss.beIPs[0],
749+
Port: int32(tss.bePorts[0]),
750+
LoadBalanceToken: lbToken,
751+
}}
752+
if drop {
753+
servers = append(servers, &lbpb.Server{
751754
LoadBalanceToken: lbToken,
752755
Drop: drop,
753-
}},
756+
})
754757
}
758+
tss.ls.sls <- &lbpb.ServerList{Servers: servers}
755759
tss.ls.statsDura = 100 * time.Millisecond
756760
creds := serverNameCheckCreds{expected: beServerName}
757761

@@ -781,7 +785,6 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp
781785
const (
782786
countRPC = 40
783787
failtosendURI = "failtosend"
784-
dropErrDesc = "request dropped by grpclb"
785788
)
786789

787790
func TestGRPCLBStatsUnarySuccess(t *testing.T) {
@@ -808,27 +811,22 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) {
808811

809812
func TestGRPCLBStatsUnaryDrop(t *testing.T) {
810813
defer leakcheck.Check(t)
811-
c := 0
812814
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
813815
testC := testpb.NewTestServiceClient(cc)
814-
for {
815-
c++
816-
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
817-
if strings.Contains(err.Error(), dropErrDesc) {
818-
break
819-
}
820-
}
816+
// The first non-failfast RPC succeeds, all connections are up.
817+
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
818+
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
821819
}
822-
for i := 0; i < countRPC; i++ {
820+
for i := 0; i < countRPC-1; i++ {
823821
testC.EmptyCall(context.Background(), &testpb.Empty{})
824822
}
825823
})
826824

827825
if err := checkStats(stats, &rpcStats{
828-
numCallsStarted: int64(countRPC + c),
829-
numCallsFinished: int64(countRPC + c),
830-
numCallsFinishedWithClientFailedToSend: int64(c - 1),
831-
numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)},
826+
numCallsStarted: int64(countRPC),
827+
numCallsFinished: int64(countRPC),
828+
numCallsFinishedKnownReceived: int64(countRPC) / 2,
829+
numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
832830
}); err != nil {
833831
t.Fatal(err)
834832
}
@@ -895,27 +893,36 @@ func TestGRPCLBStatsStreamingSuccess(t *testing.T) {
895893

896894
func TestGRPCLBStatsStreamingDrop(t *testing.T) {
897895
defer leakcheck.Check(t)
898-
c := 0
899896
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
900897
testC := testpb.NewTestServiceClient(cc)
898+
// The first non-failfast RPC succeeds, all connections are up.
899+
stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
900+
if err != nil {
901+
t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
902+
}
901903
for {
902-
c++
903-
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
904-
if strings.Contains(err.Error(), dropErrDesc) {
905-
break
906-
}
904+
if _, err = stream.Recv(); err == io.EOF {
905+
break
907906
}
908907
}
909-
for i := 0; i < countRPC; i++ {
910-
testC.FullDuplexCall(context.Background())
908+
for i := 0; i < countRPC-1; i++ {
909+
stream, err = testC.FullDuplexCall(context.Background())
910+
if err == nil {
911+
// Wait for stream to end if err is nil.
912+
for {
913+
if _, err = stream.Recv(); err == io.EOF {
914+
break
915+
}
916+
}
917+
}
911918
}
912919
})
913920

914921
if err := checkStats(stats, &rpcStats{
915-
numCallsStarted: int64(countRPC + c),
916-
numCallsFinished: int64(countRPC + c),
917-
numCallsFinishedWithClientFailedToSend: int64(c - 1),
918-
numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)},
922+
numCallsStarted: int64(countRPC),
923+
numCallsFinished: int64(countRPC),
924+
numCallsFinishedKnownReceived: int64(countRPC) / 2,
925+
numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
919926
}); err != nil {
920927
t.Fatal(err)
921928
}

0 commit comments

Comments
 (0)