Skip to content

Commit

Permalink
add service config pre-parsing support
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Mar 28, 2019
1 parent f1437f7 commit a73ba3e
Show file tree
Hide file tree
Showing 16 changed files with 345 additions and 171 deletions.
35 changes: 28 additions & 7 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package balancer

import (
"context"
"encoding/json"
"errors"
"net"
"strings"
Expand All @@ -39,7 +40,10 @@ var (
)

// Register registers the balancer builder to the balancer map. b.Name
// (lowercased) will be used as the name registered with this builder.
// (lowercased) will be used as the name registered with this builder. If the
// Builder implements Parser, Parse will be called when new service configs are
// received by the resolver, and the result will be provided to the Balancer in
// UpdateClientConnState.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple Balancers are
Expand Down Expand Up @@ -166,6 +170,14 @@ type Builder interface {
Name() string
}

// Parser parses service configs.
type Parser interface {
// Parse parses the JSON load balancer config provided into an internal
// form or returns an error if the config is invalid. For future
// compatibility reasons, unknown fields in the config should be ignored.
Parse(LoadBalancerConfig json.RawMessage) (interface{}, error)
}

// PickOptions contains addition information for the Pick operation.
type PickOptions struct {
// FullMethodName is the method name that NewClientStream() is called
Expand Down Expand Up @@ -259,7 +271,7 @@ type Balancer interface {
// non-nil error to gRPC.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateResolverState will be called instead.
// UpdateClientConnState will be called instead.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Expand All @@ -272,14 +284,23 @@ type SubConnState struct {
// TODO: add last connection error
}

// ClientConnState describes the state of a ClientConn relevant to the
// balancer.
type ClientConnState struct {
ResolverState resolver.State
// The parsed load balancer configuration returned by the builder's Parse
// method, if implemented.
BalancerConfig interface{}
}

// V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateResolverState method will be called instead
// of HandleResolvedAddrs and its UpdateSubConnState will be called instead of
// HandleSubConnStateChange.
// implements V2Balancer, its UpdateClientConnState method will be called
// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
// instead of HandleSubConnStateChange.
type V2Balancer interface {
// UpdateResolverState is called by gRPC when the state of the resolver
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes.
UpdateResolverState(resolver.State)
UpdateClientConnState(ClientConnState)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
Expand Down
10 changes: 5 additions & 5 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
panic("not implemented")
}

func (b *baseBalancer) UpdateResolverState(s resolver.State) {
// TODO: handle s.Err (log if not nil) once implemented.
// TODO: handle s.ServiceConfig?
grpclog.Infoln("base.baseBalancer: got new resolver state: ", s)
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
// TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range s.Addresses {
for _, a := range s.ResolverState.Addresses {
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns).
Expand Down
2 changes: 1 addition & 1 deletion balancer/xds/edsbalancer/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (bg *balancerGroup) handleResolvedAddrs(id string, addrs []resolver.Address
return
}
if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateResolverState(resolver.State{Addresses: addrs})
ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}})
} else {
b.HandleResolvedAddrs(addrs, nil)
}
Expand Down
24 changes: 12 additions & 12 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type ccBalancerWrapper struct {
cc *ClientConn
balancer balancer.Balancer
stateChangeQueue *scStateUpdateBuffer
resolverUpdateCh chan *resolver.State
ccUpdateCh chan *balancer.ClientConnState
done chan struct{}

mu sync.Mutex
Expand All @@ -99,7 +99,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
ccb := &ccBalancerWrapper{
cc: cc,
stateChangeQueue: newSCStateUpdateBuffer(),
resolverUpdateCh: make(chan *resolver.State, 1),
ccUpdateCh: make(chan *balancer.ClientConnState, 1),
done: make(chan struct{}),
subConns: make(map[*acBalancerWrapper]struct{}),
}
Expand All @@ -126,17 +126,17 @@ func (ccb *ccBalancerWrapper) watcher() {
} else {
ccb.balancer.HandleSubConnStateChange(t.sc, t.state)
}
case s := <-ccb.resolverUpdateCh:
case s := <-ccb.ccUpdateCh:
select {
case <-ccb.done:
ccb.balancer.Close()
return
default:
}
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateResolverState(*s)
ub.UpdateClientConnState(*s)
} else {
ccb.balancer.HandleResolvedAddrs(s.Addresses, nil)
ccb.balancer.HandleResolvedAddrs(s.ResolverState.Addresses, nil)
}
case <-ccb.done:
}
Expand Down Expand Up @@ -178,23 +178,23 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
})
}

func (ccb *ccBalancerWrapper) updateResolverState(s resolver.State) {
func (ccb *ccBalancerWrapper) updateClientConnState(s *balancer.ClientConnState) {
if ccb.cc.curBalancerName != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
for i := 0; i < len(s.Addresses); {
if s.Addresses[i].Type == resolver.GRPCLB {
copy(s.Addresses[i:], s.Addresses[i+1:])
s.Addresses = s.Addresses[:len(s.Addresses)-1]
for i := 0; i < len(s.ResolverState.Addresses); {
if s.ResolverState.Addresses[i].Type == resolver.GRPCLB {
copy(s.ResolverState.Addresses[i:], s.ResolverState.Addresses[i+1:])
s.ResolverState.Addresses = s.ResolverState.Addresses[:len(s.ResolverState.Addresses)-1]
continue
}
i++
}
}
select {
case <-ccb.resolverUpdateCh:
case <-ccb.ccUpdateCh:
default:
}
ccb.resolverUpdateCh <- &s
ccb.ccUpdateCh <- s
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand Down
21 changes: 15 additions & 6 deletions balancer_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
)

var _ balancer.Builder = &magicalLB{}
Expand Down Expand Up @@ -148,12 +149,12 @@ func (s) TestSwitchBalancer(t *testing.T) {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin.
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`, Addresses: addrs})
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs})
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "pick_first"}`, Addresses: addrs})
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs})
if err := checkPickFirst(cc, servers); err != nil {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
Expand All @@ -180,7 +181,7 @@ func (s) TestBalancerDialOption(t *testing.T) {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "pick_first"}`, Addresses: addrs})
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs})
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
Expand Down Expand Up @@ -336,7 +337,7 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
}
defer cc.Close()

sc := `{"loadBalancingPolicy": "round_robin"}`
sc := parseCfg(`{"loadBalancingPolicy": "round_robin"}`)

r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "backend"}}, ServiceConfig: sc})
var isRoundRobin bool
Expand Down Expand Up @@ -432,7 +433,7 @@ func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName)
}

sc := `{"loadBalancingPolicy": "round_robin"}`
sc := parseCfg(`{"loadBalancingPolicy": "round_robin"}`)
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
var isRoundRobin bool
for i := 0; i < 200; i++ {
Expand Down Expand Up @@ -509,8 +510,16 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
t.Fatalf("check pickfirst returned non-nil error: %v", err)
}
// Switch to roundrobin, and check against server[1] and server[2].
cc.updateResolverState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`, Addresses: addrs})
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`), Addresses: addrs})
if err := checkRoundRobin(cc, servers[1:]); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}

func parseCfg(s string) serviceconfig.Config {
c, err := serviceconfig.Parse(s)
if err != nil {
panic(fmt.Sprintf("Error parsing config %q: %v", s, err))
}
return c
}
43 changes: 22 additions & 21 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,15 +467,11 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
return nil
}

if !cc.dopts.disableServiceConfig && cc.scRaw != s.ServiceConfig {
if !cc.dopts.disableServiceConfig {
// New service config; apply it.
sc, err := parseServiceConfig(s.ServiceConfig)
if err != nil {
fmt.Println("error parsing config: ", err)
return err
if sc, ok := s.ServiceConfig.(ServiceConfig); ok {
cc.sc = sc
}
cc.scRaw = s.ServiceConfig
cc.sc = sc

if cc.sc.retryThrottling != nil {
newThrottler := &retryThrottler{
Expand All @@ -490,24 +486,29 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
}
}

var balCfg interface{}
if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var isGRPCLB bool
for _, a := range s.Addresses {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
var newBalancerName string
// TODO: use new loadBalancerConfig field with appropriate priority.
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
if cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
balCfg = cc.sc.lbConfig.cfg
} else {
newBalancerName = PickFirstBalancerName
var isGRPCLB bool
for _, a := range s.Addresses {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
Expand All @@ -516,7 +517,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
}

cc.balancerWrapper.updateResolverState(s)
cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
cc.firstResolveEvent.Fire()
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {

// SwitchBalancer before NewAddress. There was no balancer created, this
// makes sure we don't call close on nil balancerWrapper.
r.UpdateState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) // This should not panic.
r.UpdateState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.

time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
}
Expand All @@ -889,7 +889,7 @@ func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
}
// Send a new service config while closing the ClientConn.
go cc.Close()
go r.UpdateState(resolver.State{ServiceConfig: `{"loadBalancingPolicy": "round_robin"}`}) // This should not panic.
go r.UpdateState(resolver.State{ServiceConfig: parseCfg(`{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.
}
}

Expand Down Expand Up @@ -982,7 +982,7 @@ func (s) TestDisableServiceConfigOption(t *testing.T) {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
r.UpdateState(resolver.State{ServiceConfig: `{
r.UpdateState(resolver.State{ServiceConfig: parseCfg(`{
"methodConfig": [
{
"name": [
Expand All @@ -994,7 +994,7 @@ func (s) TestDisableServiceConfigOption(t *testing.T) {
"waitForReady": true
}
]
}`})
}`)})
time.Sleep(1 * time.Second)
m := cc.GetMethodConfig("/foo/Bar")
if m.WaitForReady != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second
// ParseServiceConfig is a function to parse JSON service configs into
// opaque data structures.
ParseServiceConfig func(sc string) (interface{}, error)
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
10 changes: 8 additions & 2 deletions resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
// All APIs in this package are experimental.
package resolver

import (
"google.golang.org/grpc/serviceconfig"
)

var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
Expand Down Expand Up @@ -100,8 +104,10 @@ type BuildOption struct {

// State contains the current Resolver state relevant to the ClientConn.
type State struct {
Addresses []Address // Resolved addresses for the target
ServiceConfig string // JSON representation of the service config
Addresses []Address // Resolved addresses for the target
// ServiceConfig is the parsed service config; obtained from
// serviceconfig.Parse.
ServiceConfig serviceconfig.Config

// TODO: add Err error
// TODO: add ParsedServiceConfig interface{}
Expand Down
Loading

0 comments on commit a73ba3e

Please sign in to comment.