Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: Add RPC event for blocking for a picker update #6422

Merged
merged 17 commits into from
Jul 18, 2023
2 changes: 1 addition & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (cc *ClientConn) exitIdleMode() error {
cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
cc.blockingpicker = newPickerWrapper()
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
Expand Down
32 changes: 25 additions & 7 deletions picker_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,26 @@ import (
"google.golang.org/grpc/internal/channelz"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
idle bool
blockingCh chan struct{}
picker balancer.Picker
mu sync.Mutex
done bool
idle bool
blockingCh chan struct{}
picker balancer.Picker
statsHandlers []stats.Handler // to record blocking picker calls
}

func newPickerWrapper() *pickerWrapper {
return &pickerWrapper{blockingCh: make(chan struct{})}
func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
return &pickerWrapper{
blockingCh: make(chan struct{}),
statsHandlers: statsHandlers,
}
}

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
Expand Down Expand Up @@ -95,6 +100,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
var ch chan struct{}

var lastPickErr error

for {
pw.mu.Lock()
if pw.done {
Expand Down Expand Up @@ -129,6 +135,18 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
continue
}

// If the channel is set, it means that the pick call had to wait for a
// new picker at some point. Either there was no picker and it received
// a new one, or a picker errored with ErrNoSubConnAvailable or errored
// with failfast set to false, which will trigger a continue to the next
// iteration. In that second case the only way it gets to this codeblock
// is to receive a new picker either in UpdateState or the select above.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing because a new picker is always received by UpdateState, not by the select. Maybe this sentence is: "In the latter case, we only reach this when there is a new picker."

And isn't this true in either of the two cases? Basically, we only reach this point when: 1. there is a picker immediately on the first iteration, or 2. we have a new picker available. That's why we grab the picker on the following lines. The if ch != nil filters out the first case where we never needed to block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess the language I had is technically incorrect, but I was going for it either UpdateState() before the next iteration or it UpdateState() while blocking on the <-ch if the new one didn't come before the for. Yeah, the first case (either it's there, or it's not and we block) gets there, with it being there already not triggering conditional, and not being there triggering conditional. However, I think that it is more easily implied by conditional, but let me reword the comment and see if you like it more technically.

if ch != nil {
for _, sh := range pw.statsHandlers {
sh.HandleRPC(ctx, &stats.PickerUpdated{})
}
}

ch = pw.blockingCh
p := pw.picker
pw.mu.Unlock()
Expand Down
10 changes: 5 additions & 5 deletions picker_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *testingPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error
}

func (s) TestBlockingPickTimeout(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
Expand All @@ -75,7 +75,7 @@ func (s) TestBlockingPickTimeout(t *testing.T) {
}

func (s) TestBlockingPick(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
// All goroutines should block because picker is nil in bp.
var finishedCount uint64
for i := goroutineCount; i > 0; i-- {
Expand All @@ -94,7 +94,7 @@ func (s) TestBlockingPick(t *testing.T) {
}

func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
var finishedCount uint64
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
// All goroutines should block because picker returns no subConn available.
Expand All @@ -114,7 +114,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
}

func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
var finishedCount uint64
// All goroutines should block because picker returns transientFailure and
Expand All @@ -135,7 +135,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
}

func (s) TestBlockingPickSCNotReady(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
var finishedCount uint64
// All goroutines should block because subConn is not ready.
Expand Down
4 changes: 3 additions & 1 deletion stats/opencensus/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ func populateSpan(ctx context.Context, rs stats.RPCStats, ti *traceInfo) {
trace.BoolAttribute("Client", rs.Client),
trace.BoolAttribute("FailFast", rs.FailFast),
)
case *stats.PickerUpdated:
span.Annotate(nil, "Delayed LB pick complete")
case *stats.InPayload:
// message id - "must be calculated as two different counters starting
// from 1 one for sent messages and one for received messages."
// from one for sent messages and one for received messages."
mi := atomic.AddUint32(&ti.countRecvMsg, 1)
span.AddMessageReceiveEvent(int64(mi), int64(rs.Length), int64(rs.CompressedLength))
case *stats.OutPayload:
Expand Down
10 changes: 10 additions & 0 deletions stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ func (s *Begin) IsClient() bool { return s.Client }

func (s *Begin) isRPCStats() {}

// PickerUpdated indicates that the LB policy provided a new picker while the
// RPC was waiting for one.
type PickerUpdated struct{}

// IsClient indicates if the stats information is from client side. Only Client
// Side interfaces with a Picker, thus always returns true.
func (*PickerUpdated) IsClient() bool { return true }

func (*PickerUpdated) isRPCStats() {}

// InPayload contains the information for an incoming payload.
type InPayload struct {
// Client is true if this InPayload is from client side.
Expand Down
165 changes: 165 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
Expand All @@ -44,6 +45,8 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
Expand All @@ -62,6 +65,7 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
Expand All @@ -82,6 +86,7 @@ const defaultHealthService = "grpc.health.v1.Health"

func init() {
channelz.TurnOn()
balancer.Register(triggerRPCBlockPickerBalancerBuilder{})
}

type s struct {
Expand Down Expand Up @@ -6362,3 +6367,163 @@ func (s) TestGlobalBinaryLoggingOptions(t *testing.T) {
t.Fatalf("want 8 server side binary logging events, got %v", ssbl.mml.events)
}
}

type statsHandlerRecordEvents struct {
mu sync.Mutex
s []stats.RPCStats
}

func (*statsHandlerRecordEvents) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
func (h *statsHandlerRecordEvents) HandleRPC(_ context.Context, s stats.RPCStats) {
h.mu.Lock()
defer h.mu.Unlock()
h.s = append(h.s, s)
}
func (*statsHandlerRecordEvents) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (*statsHandlerRecordEvents) HandleConn(context.Context, stats.ConnStats) {}

type triggerRPCBlockPicker struct {
pickDone func()
}

func (bp *triggerRPCBlockPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) {
bp.pickDone()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

const name = "triggerRPCBlockBalancer"

type triggerRPCBlockPickerBalancerBuilder struct{}

func (triggerRPCBlockPickerBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &triggerRPCBlockBalancer{
blockingPickerDone: grpcsync.NewEvent(),
ClientConn: cc,
}
// round_robin child to complete balancer tree with a usable leaf policy and
// have RPCs actually work.
builder := balancer.Get(roundrobin.Name)
rr := builder.Build(b, bOpts)
if rr == nil {
// Shouldn't happen, defensive programming.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: generally, commenting a panic with "this shouldn't happen" isn't helpful. Obviously it shouldn't happen or else we wouldn't have made it panic. You can say why it shouldn't happen if it's interesting, or just leave the comment out if it's obvious or if the panic explains it (which is better than having a comment explaining it).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, whoops, this was left over from when this wasn't an explicit panic. Deleted comment.

panic("round robin builder returned nil")
}
b.Balancer = rr
return b
}

func (triggerRPCBlockPickerBalancerBuilder) ParseConfig(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return &bpbConfig{}, nil
}

func (triggerRPCBlockPickerBalancerBuilder) Name() string {
return name
}

type bpbConfig struct {
serviceconfig.LoadBalancingConfig
}

type triggerRPCBlockBalancer struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: // triggerRPCBlockBalancer uses a child RR balancer, but blocks all UpdateState calls from it until the first Pick call. That first pick returns ErrNoSubConnAvailable to make the RPC block and trigger the appropriate stats handler callout. or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment, alongside a little more. Thanks for suggestion.

stateMu sync.Mutex
childState balancer.State

blockingPickerDone *grpcsync.Event
// embed a ClientConn to wrap only UpdateState() operation
balancer.ClientConn
// embed a Balancer to wrap only UpdateClientConnState() operation
balancer.Balancer
}

func (bpb *triggerRPCBlockBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
err := bpb.Balancer.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass s directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to eat the config and send down nil, but I guess this gets nil as the config. Switched.

bpb.ClientConn.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &triggerRPCBlockPicker{
pickDone: func() {
bpb.blockingPickerDone.Fire()
bpb.stateMu.Lock()
defer bpb.stateMu.Unlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the lock be taken before firing the event here? Otherwise you might race between this and a call from the balancer, and do the UpdateState twice. It doesn't lead to a correctness issue, but it does seem wrong.

Copy link
Contributor Author

@zasweq zasweq Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right. Luckily it didn't lead to non determinism of the assertion of the test, because it triggers another for iteration from the continue from ErrNoSubConnAvailable and an extra UpdateState doesn't affect picker count. However, for the intention of this testing struct, it is incorrect in that I desired to send one UpdateState. Good catch, switched.

if bpb.childState.ConnectivityState == connectivity.Ready {
bpb.ClientConn.UpdateState(bpb.childState)
}
},
},
})
return err
}

func (bpb *triggerRPCBlockBalancer) UpdateState(state balancer.State) {
bpb.stateMu.Lock()
defer bpb.stateMu.Unlock()
bpb.childState = state
if bpb.blockingPickerDone.HasFired() { // guard first one to get a picker sending ErrNoSubConnAvailable first
if state.ConnectivityState == connectivity.Ready {
bpb.ClientConn.UpdateState(state) // after the first rr picker update, only forward once READY for deterministic picker counts
}
}
}

// TestRPCBlockingOnPickerStatsCall tests the emission of a stats handler call
// that represents the RPC had to block waiting for a new picker due to
// ErrNoSubConnAvailable being returned from the first picker call.
func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) {
sh := &statsHandlerRecordEvents{}
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}

if err := ss.StartServer(); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

lbCfgJSON := `{
"loadBalancingConfig": [
{
"triggerRPCBlockBalancer": {}
}
]
}`

sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)
mr := manual.NewBuilderWithScheme("pickerupdatedbalancer")
defer mr.Close()
print("ss.Address when setting: ", ss.Address)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove or change to t.Log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, sorry, deleted.

mr.InitialState(resolver.State{
Addresses: []resolver.Address{
{Addr: ss.Address},
},
ServiceConfig: sc,
})

cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithStatsHandler(sh), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)
if _, err := testServiceClient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("Unexpected error from UnaryCall: %v", err)
}

var pickerUpdatedCount uint
for _, stat := range sh.s {
if _, ok := stat.(*stats.PickerUpdated); ok {
pickerUpdatedCount++
}
}
if pickerUpdatedCount != 1 {
t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2)
}
}
4 changes: 4 additions & 0 deletions test/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte
return ctx
}
func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
// these calls come in nondeterministically - so can just ignore
if _, ok := s.(*stats.PickerUpdated); ok {
return
}
h.mu.Lock()
h.s = append(h.s, s)
h.mu.Unlock()
Expand Down