Skip to content

Commit 2308131

Browse files
ZhouyihaiDingmenghanl
authored andcommitted
addrConn: change address to slice of address (#1376)
* addrConn: change address to slice of address * add pickfirst balancer to test new addrconn
1 parent 25b4a42 commit 2308131

File tree

3 files changed

+553
-96
lines changed

3 files changed

+553
-96
lines changed

Diff for: balancer.go

+11
Original file line numberDiff line numberDiff line change
@@ -395,3 +395,14 @@ func (rr *roundRobin) Close() error {
395395
}
396396
return nil
397397
}
398+
399+
// pickFirst is used to test multi-addresses in one addrConn in which all addresses share the same addrConn.
400+
// It is a wrapper around roundRobin balancer. The logic of all methods works fine because balancer.Get()
401+
// returns the only address Up by resetTransport().
402+
type pickFirst struct {
403+
*roundRobin
404+
}
405+
406+
func pickFirstBalancer(r naming.Resolver) Balancer {
407+
return &pickFirst{&roundRobin{r: r}}
408+
}

Diff for: balancer_test.go

+357
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package grpc
2121
import (
2222
"fmt"
2323
"math"
24+
"strconv"
2425
"sync"
2526
"testing"
2627
"time"
@@ -421,3 +422,359 @@ func TestOneAddressRemoval(t *testing.T) {
421422
servers[i].stop()
422423
}
423424
}
425+
426+
func checkServerUp(t *testing.T, currentServer *server) {
427+
req := "port"
428+
port := currentServer.port
429+
cc, err := Dial("localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
430+
if err != nil {
431+
t.Fatalf("Failed to create ClientConn: %v", err)
432+
}
433+
var reply string
434+
for {
435+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == port {
436+
break
437+
}
438+
time.Sleep(10 * time.Millisecond)
439+
}
440+
cc.Close()
441+
}
442+
443+
func TestPickFirstEmptyAddrs(t *testing.T) {
444+
servers, r := startServers(t, 1, math.MaxUint32)
445+
defer servers[0].stop()
446+
cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
447+
if err != nil {
448+
t.Fatalf("Failed to create ClientConn: %v", err)
449+
}
450+
defer cc.Close()
451+
var reply string
452+
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
453+
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
454+
}
455+
// Inject name resolution change to remove the server so that there is no address
456+
// available after that.
457+
u := &naming.Update{
458+
Op: naming.Delete,
459+
Addr: "localhost:" + servers[0].port,
460+
}
461+
r.w.inject([]*naming.Update{u})
462+
// Loop until the above updates apply.
463+
for {
464+
time.Sleep(10 * time.Millisecond)
465+
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
466+
if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
467+
break
468+
}
469+
}
470+
}
471+
472+
func TestPickFirstCloseWithPendingRPC(t *testing.T) {
473+
servers, r := startServers(t, 1, math.MaxUint32)
474+
defer servers[0].stop()
475+
cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
476+
if err != nil {
477+
t.Fatalf("Failed to create ClientConn: %v", err)
478+
}
479+
var reply string
480+
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
481+
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
482+
}
483+
// Remove the server.
484+
updates := []*naming.Update{{
485+
Op: naming.Delete,
486+
Addr: "localhost:" + servers[0].port,
487+
}}
488+
r.w.inject(updates)
489+
// Loop until the above update applies.
490+
for {
491+
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
492+
if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded {
493+
break
494+
}
495+
time.Sleep(10 * time.Millisecond)
496+
}
497+
// Issue 2 RPCs which should be completed with error status once cc is closed.
498+
var wg sync.WaitGroup
499+
wg.Add(2)
500+
go func() {
501+
defer wg.Done()
502+
var reply string
503+
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
504+
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
505+
}
506+
}()
507+
go func() {
508+
defer wg.Done()
509+
var reply string
510+
time.Sleep(5 * time.Millisecond)
511+
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil {
512+
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
513+
}
514+
}()
515+
time.Sleep(5 * time.Millisecond)
516+
cc.Close()
517+
wg.Wait()
518+
}
519+
520+
func TestPickFirstOrderAllServerUp(t *testing.T) {
521+
// Start 3 servers on 3 ports.
522+
numServers := 3
523+
servers, r := startServers(t, numServers, math.MaxUint32)
524+
for i := 0; i < numServers; i++ {
525+
defer servers[i].stop()
526+
}
527+
cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
528+
if err != nil {
529+
t.Fatalf("Failed to create ClientConn: %v", err)
530+
}
531+
defer cc.Close()
532+
// Add servers[1] and [2] to the service discovery.
533+
u := &naming.Update{
534+
Op: naming.Add,
535+
Addr: "localhost:" + servers[1].port,
536+
}
537+
r.w.inject([]*naming.Update{u})
538+
539+
u = &naming.Update{
540+
Op: naming.Add,
541+
Addr: "localhost:" + servers[2].port,
542+
}
543+
r.w.inject([]*naming.Update{u})
544+
545+
// Loop until all 3 servers are up
546+
checkServerUp(t, servers[0])
547+
checkServerUp(t, servers[1])
548+
checkServerUp(t, servers[2])
549+
550+
// Check the incoming RPCs served in server[0]
551+
req := "port"
552+
var reply string
553+
for i := 0; i < 20; i++ {
554+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
555+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
556+
}
557+
time.Sleep(10 * time.Millisecond)
558+
}
559+
560+
// Delete server[0] in the balancer, the incoming RPCs served in server[1]
561+
// For test addrconn, close server[0] instead
562+
u = &naming.Update{
563+
Op: naming.Delete,
564+
Addr: "localhost:" + servers[0].port,
565+
}
566+
r.w.inject([]*naming.Update{u})
567+
// Loop until it changes to server[1]
568+
for {
569+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
570+
break
571+
}
572+
time.Sleep(10 * time.Millisecond)
573+
}
574+
for i := 0; i < 20; i++ {
575+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
576+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
577+
}
578+
time.Sleep(10 * time.Millisecond)
579+
}
580+
581+
// Add server[0] back to the balancer, the incoming RPCs served in server[1]
582+
// Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
583+
u = &naming.Update{
584+
Op: naming.Add,
585+
Addr: "localhost:" + servers[0].port,
586+
}
587+
r.w.inject([]*naming.Update{u})
588+
for i := 0; i < 20; i++ {
589+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
590+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
591+
}
592+
time.Sleep(10 * time.Millisecond)
593+
}
594+
595+
// Delete server[1] in the balancer, the incoming RPCs served in server[2]
596+
u = &naming.Update{
597+
Op: naming.Delete,
598+
Addr: "localhost:" + servers[1].port,
599+
}
600+
r.w.inject([]*naming.Update{u})
601+
for {
602+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[2].port {
603+
break
604+
}
605+
time.Sleep(1 * time.Second)
606+
}
607+
for i := 0; i < 20; i++ {
608+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[2].port {
609+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
610+
}
611+
time.Sleep(10 * time.Millisecond)
612+
}
613+
614+
// Delete server[2] in the balancer, the incoming RPCs served in server[0]
615+
u = &naming.Update{
616+
Op: naming.Delete,
617+
Addr: "localhost:" + servers[2].port,
618+
}
619+
r.w.inject([]*naming.Update{u})
620+
for {
621+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
622+
break
623+
}
624+
time.Sleep(1 * time.Second)
625+
}
626+
for i := 0; i < 20; i++ {
627+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
628+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
629+
}
630+
time.Sleep(10 * time.Millisecond)
631+
}
632+
}
633+
634+
func TestPickFirstOrderOneServerDown(t *testing.T) {
635+
// Start 3 servers on 3 ports.
636+
numServers := 3
637+
servers, r := startServers(t, numServers, math.MaxUint32)
638+
for i := 0; i < numServers; i++ {
639+
defer servers[i].stop()
640+
}
641+
cc, err := Dial("foo.bar.com", WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
642+
if err != nil {
643+
t.Fatalf("Failed to create ClientConn: %v", err)
644+
}
645+
defer cc.Close()
646+
// Add servers[1] and [2] to the service discovery.
647+
u := &naming.Update{
648+
Op: naming.Add,
649+
Addr: "localhost:" + servers[1].port,
650+
}
651+
r.w.inject([]*naming.Update{u})
652+
653+
u = &naming.Update{
654+
Op: naming.Add,
655+
Addr: "localhost:" + servers[2].port,
656+
}
657+
r.w.inject([]*naming.Update{u})
658+
659+
// Loop until all 3 servers are up
660+
checkServerUp(t, servers[0])
661+
checkServerUp(t, servers[1])
662+
checkServerUp(t, servers[2])
663+
664+
// Check the incoming RPCs served in server[0]
665+
req := "port"
666+
var reply string
667+
for i := 0; i < 20; i++ {
668+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
669+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
670+
}
671+
time.Sleep(10 * time.Millisecond)
672+
}
673+
674+
// server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
675+
// {server[0] server[1] server[2]}
676+
servers[0].stop()
677+
// Loop until it changes to server[1]
678+
for {
679+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port {
680+
break
681+
}
682+
time.Sleep(10 * time.Millisecond)
683+
}
684+
for i := 0; i < 20; i++ {
685+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
686+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
687+
}
688+
time.Sleep(10 * time.Millisecond)
689+
}
690+
691+
// up the server[0] back, the incoming RPCs served in server[1]
692+
p, _ := strconv.Atoi(servers[0].port)
693+
servers[0] = newTestServer()
694+
go servers[0].start(t, p, math.MaxUint32)
695+
servers[0].wait(t, 2*time.Second)
696+
checkServerUp(t, servers[0])
697+
698+
for i := 0; i < 20; i++ {
699+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[1].port {
700+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
701+
}
702+
time.Sleep(10 * time.Millisecond)
703+
}
704+
705+
// Delete server[1] in the balancer, the incoming RPCs served in server[0]
706+
u = &naming.Update{
707+
Op: naming.Delete,
708+
Addr: "localhost:" + servers[1].port,
709+
}
710+
r.w.inject([]*naming.Update{u})
711+
for {
712+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[0].port {
713+
break
714+
}
715+
time.Sleep(1 * time.Second)
716+
}
717+
for i := 0; i < 20; i++ {
718+
if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err == nil || ErrorDesc(err) != servers[0].port {
719+
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
720+
}
721+
time.Sleep(10 * time.Millisecond)
722+
}
723+
}
724+
725+
func TestPickFirstOneAddressRemoval(t *testing.T) {
726+
// Start 2 servers.
727+
numServers := 2
728+
servers, r := startServers(t, numServers, math.MaxUint32)
729+
for i := 0; i < numServers; i++ {
730+
defer servers[i].stop()
731+
}
732+
cc, err := Dial("localhost:"+servers[0].port, WithBalancer(pickFirstBalancer(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
733+
if err != nil {
734+
t.Fatalf("Failed to create ClientConn: %v", err)
735+
}
736+
defer cc.Close()
737+
// Add servers[1] to the service discovery.
738+
var updates []*naming.Update
739+
updates = append(updates, &naming.Update{
740+
Op: naming.Add,
741+
Addr: "localhost:" + servers[1].port,
742+
})
743+
r.w.inject(updates)
744+
745+
// Create a new cc to Loop until servers[1] is up
746+
checkServerUp(t, servers[0])
747+
checkServerUp(t, servers[1])
748+
749+
var wg sync.WaitGroup
750+
numRPC := 100
751+
sleepDuration := 10 * time.Millisecond
752+
wg.Add(1)
753+
go func() {
754+
time.Sleep(sleepDuration)
755+
// After sleepDuration, delete server[0].
756+
var updates []*naming.Update
757+
updates = append(updates, &naming.Update{
758+
Op: naming.Delete,
759+
Addr: "localhost:" + servers[0].port,
760+
})
761+
r.w.inject(updates)
762+
wg.Done()
763+
}()
764+
765+
// All non-failfast RPCs should not fail because there's at least one connection available.
766+
for i := 0; i < numRPC; i++ {
767+
wg.Add(1)
768+
go func() {
769+
var reply string
770+
time.Sleep(sleepDuration)
771+
// After sleepDuration, invoke RPC.
772+
// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
773+
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil {
774+
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
775+
}
776+
wg.Done()
777+
}()
778+
}
779+
wg.Wait()
780+
}

0 commit comments

Comments
 (0)