Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Oct 17, 2022
1 parent e1c66fa commit 4971070
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 23 deletions.
4 changes: 2 additions & 2 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,11 @@ var ErrBadResolverState = errors.New("bad resolver state")
// SubConn to create producers when needed.
type ProducerBuilder interface {
// Build creates a Producer. The first parameter is always a
// grpc.InvokerStreamer (a type to allow creating RPCs/streams on the
// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the
// associated SubConn), but is declared as interface{} to avoid a
// dependency cycle. Should also return a close function that will be
// called when all references to the Producer have been given up.
Build(grpcInvokerStreamer interface{}) (p Producer, close func())
Build(grpcClientConnInterface interface{}) (p Producer, close func())
}

// A Producer is a type shared among potentially many consumers. It is
Expand Down
16 changes: 7 additions & 9 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*producerData)}
acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
Expand Down Expand Up @@ -364,7 +364,7 @@ func (ccb *ccBalancerWrapper) Target() string {
type acBalancerWrapper struct {
mu sync.Mutex
ac *addrConn
producers map[balancer.ProducerBuilder]*producerData
producers map[balancer.ProducerBuilder]*refCountedProducer
}

func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
Expand Down Expand Up @@ -444,12 +444,10 @@ func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args i
return cs.RecvMsg(reply)
}

// producerData stores a producer, a ref counting mechanism, and a close
// function to be called when the producer no longer has any references.
type producerData struct {
type refCountedProducer struct {
producer balancer.Producer
refs int
close func()
refs int // number of current refs to the producer
close func() // underlying producer's close function
}

func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
Expand All @@ -461,14 +459,14 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (
if pData == nil {
// Not found; create a new one and add it to the producers map.
p, close := pb.Build(acbw)
pData = &producerData{producer: p, close: close}
pData = &refCountedProducer{producer: p, close: close}
acbw.producers[pb] = pData
}
// Account for this new reference.
pData.refs++

// Return a cleanup function wrapped in a sync.Once to remove this
// reference and delete the producerData from the map if the total
// reference and delete the refCountedProducer from the map if the total
// reference count goes to zero.
unref := func() {
acbw.mu.Lock()
Expand Down
27 changes: 15 additions & 12 deletions orca/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,17 @@ type OOBListener interface {

// OOBListenerOptions contains options to control how an OOBListener is called.
type OOBListenerOptions struct {
// How often to request the server to provide a load report. May be
// provided less frequently if the server requires a longer interval, or
// may be provided more frequently if another subscriber requests a shorter
// interval.
// ReportInterval specifies how often to request the server to provide a
// load report. May be provided less frequently if the server requires a
// longer interval, or may be provided more frequently if another
// subscriber requests a shorter interval.
ReportInterval time.Duration
}

// RegisterOOBListener registers an out-of-band load report listener on sc.
// Any OOBListener may only be registered once per subchannel at a time. The
// returned stop function must be called when no longer needed.
// returned stop function must be called when no longer needed. Do not
// register a single OOBListener more than once per SubConn.
func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) {
pr, close := sc.GetOrBuildProducer(producerBuilderSingleton)
p := pr.(*producer)
Expand All @@ -95,9 +96,11 @@ func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOpt
type producer struct {
client v3orcaservicegrpc.OpenRcaServiceClient

ctx context.Context
cancel func()
closed *grpcsync.Event // fired when closure completes
closed *grpcsync.Event // fired when closure completes
// backoff is called between stream attempts to determine how long to delay
// to avoid overloading a server experiencing problems. The attempt count
// is incremented when stream errors occur and is reset when the stream
// reports a result.
backoff func(int) time.Duration

mu sync.Mutex
Expand Down Expand Up @@ -130,9 +133,9 @@ func (p *producer) unregisterListener(l OOBListener, interval time.Duration) {
func (p *producer) minInterval() time.Duration {
p.mu.Lock()
defer p.mu.Unlock()
min := time.Duration(-1)
for t := range p.intervals {
if min == time.Duration(-1) || t < min {
var min time.Duration
for i, t := range p.intervals {
if t < min || i == 0 {
min = t
}
}
Expand Down Expand Up @@ -197,7 +200,7 @@ func (p *producer) runStream(ctx context.Context) (resetBackoff bool, err error)
ReportInterval: durationpb.New(interval),
})
if err != nil {
return resetBackoff, err
return false, err
}

for {
Expand Down

0 comments on commit 4971070

Please sign in to comment.