-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stats: Add RPC event for blocking for a picker update #6422
Changes from 11 commits
942bdd9
dc4e6b8
1361dcc
e69d389
5ec681a
ff3dda4
906ed7d
e6d27af
00f02a5
a5d9ad2
624eaf2
9fa6112
4685c89
7a796d9
8e40bc1
e1a01c2
1daf196
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import ( | |
"bytes" | ||
"context" | ||
"crypto/tls" | ||
"encoding/json" | ||
"errors" | ||
"flag" | ||
"fmt" | ||
|
@@ -44,6 +45,8 @@ import ( | |
"golang.org/x/net/http2" | ||
"golang.org/x/net/http2/hpack" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/balancer" | ||
"google.golang.org/grpc/balancer/roundrobin" | ||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/connectivity" | ||
"google.golang.org/grpc/credentials" | ||
|
@@ -62,6 +65,7 @@ import ( | |
"google.golang.org/grpc/peer" | ||
"google.golang.org/grpc/resolver" | ||
"google.golang.org/grpc/resolver/manual" | ||
"google.golang.org/grpc/serviceconfig" | ||
"google.golang.org/grpc/stats" | ||
"google.golang.org/grpc/status" | ||
"google.golang.org/grpc/tap" | ||
|
@@ -82,6 +86,7 @@ const defaultHealthService = "grpc.health.v1.Health" | |
|
||
func init() { | ||
channelz.TurnOn() | ||
balancer.Register(triggerRPCBlockPickerBalancerBuilder{}) | ||
} | ||
|
||
type s struct { | ||
|
@@ -6362,3 +6367,160 @@ func (s) TestGlobalBinaryLoggingOptions(t *testing.T) { | |
t.Fatalf("want 8 server side binary logging events, got %v", ssbl.mml.events) | ||
} | ||
} | ||
|
||
type statsHandlerRecordEvents struct { | ||
mu sync.Mutex | ||
s []stats.RPCStats | ||
} | ||
|
||
func (*statsHandlerRecordEvents) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { | ||
return ctx | ||
} | ||
func (h *statsHandlerRecordEvents) HandleRPC(_ context.Context, s stats.RPCStats) { | ||
h.mu.Lock() | ||
defer h.mu.Unlock() | ||
h.s = append(h.s, s) | ||
} | ||
func (*statsHandlerRecordEvents) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { | ||
return ctx | ||
} | ||
func (*statsHandlerRecordEvents) HandleConn(context.Context, stats.ConnStats) {} | ||
|
||
type triggerRPCBlockPicker struct { | ||
pickDone func() | ||
} | ||
|
||
func (bp *triggerRPCBlockPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) { | ||
bp.pickDone() | ||
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable | ||
} | ||
|
||
const name = "triggerRPCBlockBalancer" | ||
|
||
type triggerRPCBlockPickerBalancerBuilder struct{} | ||
|
||
func (triggerRPCBlockPickerBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { | ||
b := &triggerRPCBlockBalancer{ | ||
blockingPickerDone: grpcsync.NewEvent(), | ||
ClientConn: cc, | ||
} | ||
// round_robin child to complete balancer tree with a usable leaf policy and | ||
// have RPCs actually work. | ||
builder := balancer.Get(roundrobin.Name) | ||
rr := builder.Build(b, bOpts) | ||
if rr == nil { | ||
// Shouldn't happen, defensive programming. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: generally, commenting a panic with "this shouldn't happen" isn't helpful. Obviously it shouldn't happen or else we wouldn't have made it panic. You can say why it shouldn't happen if it's interesting, or just leave the comment out if it's obvious or if the panic explains it (which is better than having a comment explaining it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, whoops, this was left over from when this wasn't an explicit panic. Deleted comment. |
||
panic("round robin builder returned nil") | ||
} | ||
b.Balancer = rr | ||
return b | ||
} | ||
|
||
func (triggerRPCBlockPickerBalancerBuilder) ParseConfig(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { | ||
return &bpbConfig{}, nil | ||
} | ||
|
||
func (triggerRPCBlockPickerBalancerBuilder) Name() string { | ||
return name | ||
} | ||
|
||
type bpbConfig struct { | ||
serviceconfig.LoadBalancingConfig | ||
} | ||
|
||
type triggerRPCBlockBalancer struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comment, alongside a little more. Thanks for suggestion. |
||
stateMu sync.Mutex | ||
childState balancer.State | ||
|
||
blockingPickerDone *grpcsync.Event | ||
// embed a ClientConn to wrap only UpdateState() operation | ||
balancer.ClientConn | ||
// embed a Balancer to wrap only UpdateClientConnState() operation | ||
balancer.Balancer | ||
} | ||
|
||
func (bpb *triggerRPCBlockBalancer) UpdateClientConnState(s balancer.ClientConnState) error { | ||
err := bpb.Balancer.UpdateClientConnState(balancer.ClientConnState{ | ||
ResolverState: s.ResolverState, | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to eat the config and send down nil, but I guess this gets nil as the config. Switched. |
||
bpb.ClientConn.UpdateState(balancer.State{ | ||
ConnectivityState: connectivity.Connecting, | ||
Picker: &triggerRPCBlockPicker{ | ||
pickDone: func() { | ||
bpb.blockingPickerDone.Fire() | ||
bpb.stateMu.Lock() | ||
cs := bpb.childState | ||
bpb.stateMu.Unlock() | ||
bpb.ClientConn.UpdateState(cs) | ||
}, | ||
}, | ||
}) | ||
return err | ||
} | ||
|
||
func (bpb *triggerRPCBlockBalancer) UpdateState(state balancer.State) { | ||
bpb.stateMu.Lock() | ||
bpb.childState = state | ||
bpb.stateMu.Unlock() | ||
if bpb.blockingPickerDone.HasFired() { // guard first one to get a picker sending ErrNoSubConnAvailable first | ||
bpb.ClientConn.UpdateState(state) // after the first rr picker update, cc will trigger more, so actually forward these | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This dance seems racy. Maybe it should be: func UpdateState() {
Lock(); defer Unlock()
if HasFired() { UpdateState() } else { childState = state }
}
pickDone: func() {
Lock(); defer Unlock()
Fire(); UpdateState(childState)
} ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, maybe we should not update the picker at all (except the initial one at Build or the first UpdateCCS call) until the state is READY. Otherwise we might get an unpredictable number of updates. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. I messed around with it to what it is now. |
||
} | ||
} | ||
|
||
// TestRPCBlockingOnPickerStatsCall tests the emission of a stats handler call | ||
// that represents the RPC had to block waiting for a new picker due to | ||
// ErrNoSubConnAvailable being returned from the first picker call. | ||
func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) { | ||
sh := &statsHandlerRecordEvents{} | ||
ss := &stubserver.StubServer{ | ||
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { | ||
return &testpb.SimpleResponse{}, nil | ||
}, | ||
} | ||
|
||
if err := ss.StartServer(); err != nil { | ||
t.Fatalf("Error starting endpoint server: %v", err) | ||
} | ||
defer ss.Stop() | ||
|
||
lbCfgJSON := `{ | ||
"loadBalancingConfig": [ | ||
{ | ||
"triggerRPCBlockBalancer": {} | ||
} | ||
] | ||
}` | ||
|
||
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON) | ||
mr := manual.NewBuilderWithScheme("pickerupdatedbalancer") | ||
defer mr.Close() | ||
print("ss.Address when setting: ", ss.Address) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove or change to t.Log There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whoops, sorry, deleted. |
||
mr.InitialState(resolver.State{ | ||
Addresses: []resolver.Address{ | ||
{Addr: ss.Address}, | ||
}, | ||
ServiceConfig: sc, | ||
}) | ||
|
||
cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithStatsHandler(sh), grpc.WithTransportCredentials(insecure.NewCredentials())) | ||
if err != nil { | ||
t.Fatalf("grpc.Dial() failed: %v", err) | ||
} | ||
defer cc.Close() | ||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||
defer cancel() | ||
testServiceClient := testgrpc.NewTestServiceClient(cc) | ||
if _, err := testServiceClient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { | ||
t.Fatalf("Unexpected error from UnaryCall: %v", err) | ||
} | ||
|
||
var pickerUpdatedCount uint | ||
for _, stat := range sh.s { | ||
if _, ok := stat.(*stats.PickerUpdated); ok { | ||
pickerUpdatedCount++ | ||
} | ||
} | ||
if pickerUpdatedCount != 2 { | ||
t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a comment here or where
statsHandler
field is defined about what this is for and how this is used?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment on field: statsHandlers []stats.Handler // to record blocking picker calls