Skip to content

Commit 7cb2c9b

Browse files
committed
grpclb: drop only when at least one SubConn is ready
1 parent 2259ee6 commit 7cb2c9b

File tree

2 files changed

+46
-33
lines changed

2 files changed

+46
-33
lines changed

Diff for: balancer/grpclb/grpclb.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,15 @@ func (lb *lbBalancer) regeneratePicker() {
258258
}
259259
}
260260

261+
if len(readySCs) <= 0 {
262+
// If there's no ready SubConns, always return re-pick. This is to avoid
263+
// drops unless at least one SubConn is ready. Otherwise we may drop
264+
// more often than want because of drops + re-picks(which becomes
265+
// re-drops).
266+
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
267+
return
268+
}
261269
if len(lb.fullServerList) <= 0 {
262-
if len(readySCs) <= 0 {
263-
lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
264-
return
265-
}
266270
lb.picker = &rrPicker{subConns: readySCs}
267271
return
268272
}

Diff for: balancer/grpclb/grpclb_test.go

+38-29
Original file line numberDiff line numberDiff line change
@@ -744,14 +744,18 @@ func runAndGetStats(t *testing.T, drop bool, runRPCs func(*grpc.ClientConn)) *rp
744744
t.Fatalf("failed to create new load balancer: %v", err)
745745
}
746746
defer cleanup()
747-
tss.ls.sls <- &lbpb.ServerList{
748-
Servers: []*lbpb.Server{{
749-
IpAddress: tss.beIPs[0],
750-
Port: int32(tss.bePorts[0]),
747+
servers := []*lbpb.Server{{
748+
IpAddress: tss.beIPs[0],
749+
Port: int32(tss.bePorts[0]),
750+
LoadBalanceToken: lbToken,
751+
}}
752+
if drop {
753+
servers = append(servers, &lbpb.Server{IpAddress: tss.beIPs[0],
751754
LoadBalanceToken: lbToken,
752755
Drop: drop,
753-
}},
756+
})
754757
}
758+
tss.ls.sls <- &lbpb.ServerList{Servers: servers}
755759
tss.ls.statsDura = 100 * time.Millisecond
756760
creds := serverNameCheckCreds{expected: beServerName}
757761

@@ -808,27 +812,22 @@ func TestGRPCLBStatsUnarySuccess(t *testing.T) {
808812

809813
func TestGRPCLBStatsUnaryDrop(t *testing.T) {
810814
defer leakcheck.Check(t)
811-
c := 0
812815
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
813816
testC := testpb.NewTestServiceClient(cc)
814-
for {
815-
c++
816-
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
817-
if strings.Contains(err.Error(), dropErrDesc) {
818-
break
819-
}
820-
}
817+
// The first non-failfast RPC succeeds, all connections are up.
818+
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
819+
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
821820
}
822-
for i := 0; i < countRPC; i++ {
821+
for i := 0; i < countRPC-1; i++ {
823822
testC.EmptyCall(context.Background(), &testpb.Empty{})
824823
}
825824
})
826825

827826
if err := checkStats(stats, &rpcStats{
828-
numCallsStarted: int64(countRPC + c),
829-
numCallsFinished: int64(countRPC + c),
830-
numCallsFinishedWithClientFailedToSend: int64(c - 1),
831-
numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)},
827+
numCallsStarted: int64(countRPC),
828+
numCallsFinished: int64(countRPC),
829+
numCallsFinishedKnownReceived: int64(countRPC) / 2,
830+
numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
832831
}); err != nil {
833832
t.Fatal(err)
834833
}
@@ -898,24 +897,34 @@ func TestGRPCLBStatsStreamingDrop(t *testing.T) {
898897
c := 0
899898
stats := runAndGetStats(t, true, func(cc *grpc.ClientConn) {
900899
testC := testpb.NewTestServiceClient(cc)
900+
// The first non-failfast RPC succeeds, all connections are up.
901+
stream, err := testC.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
902+
if err != nil {
903+
t.Fatalf("%v.FullDuplexCall(_, _) = _, %v, want _, <nil>", testC, err)
904+
}
901905
for {
902-
c++
903-
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
904-
if strings.Contains(err.Error(), dropErrDesc) {
905-
break
906-
}
906+
if _, err = stream.Recv(); err == io.EOF {
907+
break
907908
}
908909
}
909-
for i := 0; i < countRPC; i++ {
910-
testC.FullDuplexCall(context.Background())
910+
for i := 0; i < countRPC-1; i++ {
911+
stream, err = testC.FullDuplexCall(context.Background())
912+
if err == nil {
913+
// Wait for stream to end if err is nil.
914+
for {
915+
if _, err = stream.Recv(); err == io.EOF {
916+
break
917+
}
918+
}
919+
}
911920
}
912921
})
913922

914923
if err := checkStats(stats, &rpcStats{
915-
numCallsStarted: int64(countRPC + c),
916-
numCallsFinished: int64(countRPC + c),
917-
numCallsFinishedWithClientFailedToSend: int64(c - 1),
918-
numCallsDropped: map[string]int64{lbToken: int64(countRPC + 1)},
924+
numCallsStarted: int64(countRPC + c),
925+
numCallsFinished: int64(countRPC + c),
926+
numCallsFinishedKnownReceived: int64(countRPC) / 2,
927+
numCallsDropped: map[string]int64{lbToken: int64(countRPC) / 2},
919928
}); err != nil {
920929
t.Fatal(err)
921930
}

0 commit comments

Comments
 (0)