Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions go/vt/servenv/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ func serveGRPC() {
}

// and serve on it
// NOTE: Before we call Serve(), all services must have registered themselves
// with "GRPCServer". This is the case because go/vt/servenv/run.go
// runs all OnRun() hooks after createGRPCServer() and before
// serveGRPC(). If this was not the case, the binary would crash with
// the error "grpc: Server.RegisterService after Server.Serve".
go GRPCServer.Serve(listener)

OnTermSync(func() {
Expand Down
14 changes: 7 additions & 7 deletions go/vt/throttler/grpcthrottlerclient/grpcthrottlerclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import (
// TestThrottlerServer tests the gRPC implementation using a throttler client
// and server.
func TestThrottlerServer(t *testing.T) {
s, port := startGRPCServer(t)
// Use the global manager which is a singleton.
grpcthrottlerserver.StartServer(s, throttler.GlobalManager)
port := startGRPCServer(t, throttler.GlobalManager)

// Create a ThrottlerClient gRPC client to talk to the throttler.
client, err := factory(fmt.Sprintf("localhost:%v", port))
Expand All @@ -47,9 +46,8 @@ func TestThrottlerServer(t *testing.T) {
// TestThrottlerServerPanics tests the panic handling of the gRPC throttler
// server implementation.
func TestThrottlerServerPanics(t *testing.T) {
s, port := startGRPCServer(t)
// For testing the panic handling, use a fake Manager instead.
grpcthrottlerserver.StartServer(s, &throttlerclienttest.FakeManager{})
port := startGRPCServer(t, &throttlerclienttest.FakeManager{})

// Create a ThrottlerClient gRPC client to talk to the throttler.
client, err := factory(fmt.Sprintf("localhost:%v", port))
Expand All @@ -61,15 +59,17 @@ func TestThrottlerServerPanics(t *testing.T) {
throttlerclienttest.TestSuitePanics(t, client)
}

func startGRPCServer(t *testing.T) (*grpc.Server, int) {
func startGRPCServer(t *testing.T, m throttler.Manager) int {
// Listen on a random port.
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
}

// Create a gRPC server and listen on the port.
s := grpc.NewServer()
grpcthrottlerserver.RegisterServer(s, m)
// Call Serve() after our service has been registered. Otherwise, the test
// will fail with the error "grpc: Server.RegisterService after Server.Serve".
go s.Serve(listener)
return s, listener.Addr().(*net.TCPAddr).Port
return listener.Addr().(*net.TCPAddr).Port
}
6 changes: 3 additions & 3 deletions go/vt/throttler/grpcthrottlerserver/grpcthrottlerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ func (s *Server) ResetConfiguration(_ context.Context, request *throttlerdata.Re
}, nil
}

// StartServer registers the Server instance with the gRPC server.
func StartServer(s *grpc.Server, m throttler.Manager) {
// RegisterServer registers a new throttler server instance with the gRPC server.
func RegisterServer(s *grpc.Server, m throttler.Manager) {
throttlerservice.RegisterThrottlerServer(s, NewServer(m))
}

func init() {
servenv.OnRun(func() {
if servenv.GRPCCheckServiceMap("throttler") {
StartServer(servenv.GRPCServer, throttler.GlobalManager)
RegisterServer(servenv.GRPCServer, throttler.GlobalManager)
}
})
}
2 changes: 1 addition & 1 deletion go/vt/wrangler/testlib/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func TestVtctlThrottlerCommands(t *testing.T) {
t.Fatalf("Cannot listen: %v", err)
}
s := grpc.NewServer()
grpcthrottlerserver.RegisterServer(s, throttler.GlobalManager)
go s.Serve(listener)
grpcthrottlerserver.StartServer(s, throttler.GlobalManager)

addr := fmt.Sprintf("localhost:%v", listener.Addr().(*net.TCPAddr).Port)

Expand Down