From 5f90c54ee3b13d64c84c45150f0e73adf4e14499 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Wed, 16 Mar 2022 12:21:53 -0400 Subject: [PATCH 01/17] Add WaitForReady to vtctldclient interface Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtctl/grpcvtctldclient/client.go | 8 ++++++++ go/vt/vtctl/localvtctldclient/client.go | 4 ++++ go/vt/vtctl/vtctldclient/client.go | 8 +++++++- 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/go/vt/vtctl/grpcvtctldclient/client.go b/go/vt/vtctl/grpcvtctldclient/client.go index e688e998da6..8007341ae78 100644 --- a/go/vt/vtctl/grpcvtctldclient/client.go +++ b/go/vt/vtctl/grpcvtctldclient/client.go @@ -19,6 +19,8 @@ limitations under the License. package grpcvtctldclient import ( + "context" + "google.golang.org/grpc" "vitess.io/vitess/go/vt/grpcclient" @@ -69,6 +71,7 @@ func NewWithDialOpts(addr string, failFast grpcclient.FailFast, opts ...grpc.Dia }, nil } +// Close is part of the vtctldclient.VtctldClient interface. func (client *gRPCVtctldClient) Close() error { err := client.cc.Close() if err == nil { @@ -78,6 +81,11 @@ func (client *gRPCVtctldClient) Close() error { return err } +// WaitForReady is part of the vtctldclient.VtctldClient interface. +func (client *gRPCVtctldClient) WaitForReady(ctx context.Context) error { + return nil +} + func init() { vtctldclient.Register("grpc", gRPCVtctldClientFactory) } diff --git a/go/vt/vtctl/localvtctldclient/client.go b/go/vt/vtctl/localvtctldclient/client.go index b870247fe07..3f91a162abc 100644 --- a/go/vt/vtctl/localvtctldclient/client.go +++ b/go/vt/vtctl/localvtctldclient/client.go @@ -17,6 +17,7 @@ limitations under the License. package localvtctldclient import ( + "context" "errors" "sync" @@ -37,6 +38,9 @@ type localVtctldClient struct { // Close is part of the vtctldclient.VtctldClient interface. func (client *localVtctldClient) Close() error { return nil } +// WaitForReady is part of the vtctldclient.VtctldClient interface. +func (client *localVtctldClient) WaitForReady(ctx context.Context) error { return nil } + //go:generate -command localvtctldclient go run ../vtctldclient/codegen //go:generate localvtctldclient -targetpkg localvtctldclient -impl localVtctldClient -out client_gen.go -local diff --git a/go/vt/vtctl/vtctldclient/client.go b/go/vt/vtctl/vtctldclient/client.go index e064b8bd9ae..c5be65bd9a2 100644 --- a/go/vt/vtctl/vtctldclient/client.go +++ b/go/vt/vtctl/vtctldclient/client.go @@ -3,16 +3,22 @@ package vtctldclient import ( + "context" "fmt" "log" vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice" ) -// VtctldClient augments the vtctlservicepb.VtctlClient interface with io.Closer. type VtctldClient interface { vtctlservicepb.VtctldClient + + // Close augments the vtctlservicepb.VtctlClient interface with io.Closer. Close() error + + // WaitForReady waits until the connection to the vtctld is in a ready state, + // or until the context times out. + WaitForReady(ctx context.Context) error } // Factory is a function that creates new VtctldClients. From 11761ee5dd8042dd0d9435faec6d5dfca0e57450 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Wed, 16 Mar 2022 14:36:28 -0400 Subject: [PATCH 02/17] Implement WaitForReady in grpcvtctldclient Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtctl/grpcvtctldclient/client.go | 38 +++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/go/vt/vtctl/grpcvtctldclient/client.go b/go/vt/vtctl/grpcvtctldclient/client.go index 8007341ae78..6a22766d0ef 100644 --- a/go/vt/vtctl/grpcvtctldclient/client.go +++ b/go/vt/vtctl/grpcvtctldclient/client.go @@ -20,8 +20,10 @@ package grpcvtctldclient import ( "context" + "fmt" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/vtctl/grpcclientcommon" @@ -83,7 +85,41 @@ func (client *gRPCVtctldClient) Close() error { // WaitForReady is part of the vtctldclient.VtctldClient interface. func (client *gRPCVtctldClient) WaitForReady(ctx context.Context) error { - return nil + // The gRPC implementation of WaitForReady uses the gRPC Connectivity API + // See https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md + for { + select { + // A READY connection to the vtctld could not be established + // within the context timeout. The caller should close their + // existing connection and establish a new one. + case <-ctx.Done(): + return fmt.Errorf("gRPC connection wait time exceeded") + + // Wait to transition to READY state + default: + connState := client.cc.GetState() + + switch connState { + case connectivity.Ready: + return nil + + // Per https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md, + // a client that enters SHUTDOWN state never leave this state, and all new RPCs should + // fail immediately. So, we don't need to waste time by continuing to poll and can + // return an error immediately so that the caller can close the connection. + case connectivity.Shutdown: + return fmt.Errorf("gRPCVtctldClient in a SHUTDOWN state") + + // If the connection is IDLE, CONNECTING, or in a TRANSIENT_FAILURE mode, + // then we wait to see if it will transition to a READY state. + default: + if !client.cc.WaitForStateChange(ctx, connState) { + // If the client has failed to transition, fail so that the caller can close the connection. + return fmt.Errorf("failed to transition from state %s", connState) + } + } + } + } } func init() { From f5ec764a81335f143d465e2aaae960013c428ffb Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Wed, 16 Mar 2022 14:37:28 -0400 Subject: [PATCH 03/17] Call WaitForReady in VTAdmin's vtctld proxy + add a test Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- .../discovery/fakediscovery/discovery.go | 12 +++ go/vt/vtadmin/vtctldclient/proxy.go | 39 +++++++- go/vt/vtadmin/vtctldclient/proxy_test.go | 91 +++++++++++++++++++ 3 files changed, 138 insertions(+), 4 deletions(-) diff --git a/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go b/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go index 4e388839212..5f94d7a4769 100644 --- a/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go +++ b/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go @@ -60,6 +60,18 @@ func New() *Fake { } } +func (d *Fake) Clear() { + d.gates = &gates{ + byTag: map[string][]*vtadminpb.VTGate{}, + byName: map[string]*vtadminpb.VTGate{}, + } + + d.vtctlds = &vtctlds{ + byTag: map[string][]*vtadminpb.Vtctld{}, + byName: map[string]*vtadminpb.Vtctld{}, + } +} + // AddTaggedGates adds the given gates to the discovery fake, associating each // gate with each tag. To tag different gates with multiple tags, call multiple // times with the same gates but different tag slices. Gates are uniquely diff --git a/go/vt/vtadmin/vtctldclient/proxy.go b/go/vt/vtadmin/vtctldclient/proxy.go index 46f12d2da73..fea5786de8b 100644 --- a/go/vt/vtadmin/vtctldclient/proxy.go +++ b/go/vt/vtadmin/vtctldclient/proxy.go @@ -26,6 +26,7 @@ import ( "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/orchestrator/external/golib/log" "vitess.io/vitess/go/vt/vtadmin/cluster/discovery" "vitess.io/vitess/go/vt/vtadmin/debug" "vitess.io/vitess/go/vt/vtadmin/vtadminproto" @@ -104,14 +105,27 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { if vtctld.VtctldClient != nil { if !vtctld.closed { - span.Annotate("is_noop", true) - span.Annotate("vtctld_host", vtctld.host) + // TODO add a flag for context timeout + waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second) + defer waitCancel() - vtctld.lastPing = time.Now() + if err := vtctld.VtctldClient.WaitForReady(waitCtx); err == nil { + // Our cached connection is still open and ready, so we're good to go. + log.Infof("Using cached connection to vtctld %s\n", vtctld.host) - return nil + span.Annotate("is_noop", true) + span.Annotate("vtctld_host", vtctld.host) + + vtctld.lastPing = time.Now() + + return nil + } + // If WaitForReady returns an error, that indicates our cached connection + // is no longer valid. We fall through to close the cached connection, + // discover a new vtctld, and establish a new connection. } + log.Infof("Closing stale connection to vtctld %s\n", vtctld.host) span.Annotate("is_stale", true) // close before reopen. this is safe to call on an already-closed client. @@ -120,6 +134,7 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { } } + log.Infof("Discovering vtctld to dial...\n") addr, err := vtctld.discovery.DiscoverVtctldAddr(ctx, nil) if err != nil { return fmt.Errorf("error discovering vtctld to dial: %w", err) @@ -139,11 +154,27 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { opts = append(opts, grpc.WithPerRPCCredentials(vtctld.creds)) } + log.Infof("Discovered vtctld %s; attempting to establish gRPC connection...\n", addr) client, err := vtctld.DialFunc(addr, grpcclient.FailFast(false), opts...) if err != nil { return err } + log.Infof("Established gRPC connection to vtctld %s; waiting to transition to READY...\n", addr) + // TODO use flag + waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second) + defer waitCancel() + + if err := client.WaitForReady(waitCtx); err != nil { + // If the gRPC connection does not transition to a READY state within the context timeout, + // then return an error. The onus to redial (or not) is on the caller of the Dial function. + // As an enhancement, we could update this Dial function to try redialing the discovered vtctld + // a few times with a backoff before giving up. + log.Infof("Could not transition to READY state for gRPC connection to %s: %s\n", addr, err.Error()) + return err + } + + log.Infof("Established gRPC connection to vtctld %s\n", addr) vtctld.dialedAt = time.Now() vtctld.host = addr vtctld.VtctldClient = client diff --git a/go/vt/vtadmin/vtctldclient/proxy_test.go b/go/vt/vtadmin/vtctldclient/proxy_test.go index d256a030a54..b926920c009 100644 --- a/go/vt/vtadmin/vtctldclient/proxy_test.go +++ b/go/vt/vtadmin/vtctldclient/proxy_test.go @@ -20,6 +20,7 @@ import ( "context" "net" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -67,3 +68,93 @@ func TestDial(t *testing.T) { assert.NoError(t, err) assert.Equal(t, listener.Addr().String(), proxy.host) } + +// TestRedial tests that vtadmin-api is able to recover from a lost connection to +// a vtctld by rediscovering and redialing a new one. +func TestRedial(t *testing.T) { + // Initialize vtctld #1 + listener1, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer listener1.Close() + + vtctld1 := &fakeVtctld{} + server1 := grpc.NewServer() + + go server1.Serve(listener1) + defer server1.Stop() + + vtctlservicepb.RegisterVtctlServer(server1, vtctld1) + + // Initialize vtctld #2 + listener2, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer listener2.Close() + + vtctld2 := &fakeVtctld{} + server2 := grpc.NewServer() + + go server2.Serve(listener2) + defer server2.Stop() + + vtctlservicepb.RegisterVtctlServer(server2, vtctld2) + + // Register both vtctlds with VTAdmin + disco := fakediscovery.New() + disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ + Hostname: listener1.Addr().String(), + }, &vtadminpb.Vtctld{ + Hostname: listener2.Addr().String(), + }) + + proxy := New(&Config{ + Cluster: &vtadminpb.Cluster{ + Id: "test", + Name: "testcluster", + }, + Discovery: disco, + }) + + // We don't have a vtctld host until we call Dial + require.Empty(t, proxy.host) + + // Check for a successful connection to whichever vtctld we discover first. + err = proxy.Dial(context.Background()) + assert.NoError(t, err) + + // vtadmin's fakediscovery package discovers vtctlds in random order. Rather + // than force some cumbersome sequential logic, we can just do a switcheroo + // here in the test to determine our "current" and (expected) "next" vtctlds. + var currentVtctld *grpc.Server + var nextAddr string + + switch proxy.host { + case listener1.Addr().String(): + currentVtctld = server1 + nextAddr = listener2.Addr().String() + + case listener2.Addr().String(): + currentVtctld = server2 + nextAddr = listener1.Addr().String() + default: + t.Fatalf("invalid proxy host %s", proxy.host) + } + + // Remove the shut down vtctld from VTAdmin's service discovery (clumsily). + // Otherwise, when redialing, we may redial the vtctld that we just shut down. + // FIXME make this nicer + disco.Clear() + disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ + Hostname: nextAddr, + }) + + // Force an ungraceful shutdown of the gRPC server to which we're connected + currentVtctld.Stop() + + // FIXME use WaitForReady instead + time.Sleep(2 * time.Second) + + // Finally, check that dial + establish a connection to the remaining vtctld. + err = proxy.Dial(context.Background()) + assert.NoError(t, err) + assert.Equal(t, nextAddr, proxy.host) +} From f758a1541ceb5bab008d13c3f450722dd5edd1af Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Wed, 16 Mar 2022 14:47:08 -0400 Subject: [PATCH 04/17] Nit: typo Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtctl/grpcvtctldclient/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtctl/grpcvtctldclient/client.go b/go/vt/vtctl/grpcvtctldclient/client.go index 6a22766d0ef..e55f08d81a2 100644 --- a/go/vt/vtctl/grpcvtctldclient/client.go +++ b/go/vt/vtctl/grpcvtctldclient/client.go @@ -104,7 +104,7 @@ func (client *gRPCVtctldClient) WaitForReady(ctx context.Context) error { return nil // Per https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md, - // a client that enters SHUTDOWN state never leave this state, and all new RPCs should + // a client that enters SHUTDOWN state never leaves this state, and all new RPCs should // fail immediately. So, we don't need to waste time by continuing to poll and can // return an error immediately so that the caller can close the connection. case connectivity.Shutdown: From 3e35df9208a02cadffaf79511104a7d970fe426e Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Wed, 16 Mar 2022 16:50:55 -0400 Subject: [PATCH 05/17] Add a grpc-connectivity-timeout config flag Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/config.go | 5 +++++ go/vt/vtadmin/vtctldclient/proxy.go | 6 ++---- go/vt/vtadmin/vtctldclient/proxy_test.go | 3 ++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/go/vt/vtadmin/vtctldclient/config.go b/go/vt/vtadmin/vtctldclient/config.go index 2ff3ad1d34f..3abbcdf21ad 100644 --- a/go/vt/vtadmin/vtctldclient/config.go +++ b/go/vt/vtadmin/vtctldclient/config.go @@ -18,6 +18,7 @@ package vtctldclient import ( "fmt" + "time" "github.com/spf13/pflag" @@ -36,6 +37,8 @@ type Config struct { CredentialsPath string Cluster *vtadminpb.Cluster + + ConnectivityTimeout time.Duration } // Parse returns a new config with the given cluster and discovery, after @@ -61,6 +64,8 @@ func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string) func (c *Config) Parse(args []string) error { fs := pflag.NewFlagSet("", pflag.ContinueOnError) + fs.DurationVar(&c.ConnectivityTimeout, "grpc-connectivity-timeout", 2*time.Second, "The maximum duration to wait for a vtctld gRPC connection to be established.") + credentialsTmplStr := fs.String("credentials-path-tmpl", "", "Go template used to specify a path to a credentials file, which is a json file containing "+ "a Username and Password. Templates are given the context of the vtctldclient.Config, "+ diff --git a/go/vt/vtadmin/vtctldclient/proxy.go b/go/vt/vtadmin/vtctldclient/proxy.go index fea5786de8b..062acc753b9 100644 --- a/go/vt/vtadmin/vtctldclient/proxy.go +++ b/go/vt/vtadmin/vtctldclient/proxy.go @@ -105,8 +105,7 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { if vtctld.VtctldClient != nil { if !vtctld.closed { - // TODO add a flag for context timeout - waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second) + waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout) defer waitCancel() if err := vtctld.VtctldClient.WaitForReady(waitCtx); err == nil { @@ -161,8 +160,7 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { } log.Infof("Established gRPC connection to vtctld %s; waiting to transition to READY...\n", addr) - // TODO use flag - waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second) + waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout) defer waitCancel() if err := client.WaitForReady(waitCtx); err != nil { diff --git a/go/vt/vtadmin/vtctldclient/proxy_test.go b/go/vt/vtadmin/vtctldclient/proxy_test.go index b926920c009..b96a872b860 100644 --- a/go/vt/vtadmin/vtctldclient/proxy_test.go +++ b/go/vt/vtadmin/vtctldclient/proxy_test.go @@ -111,7 +111,8 @@ func TestRedial(t *testing.T) { Id: "test", Name: "testcluster", }, - Discovery: disco, + Discovery: disco, + ConnectivityTimeout: 2 * time.Second, }) // We don't have a vtctld host until we call Dial From f078bcb9c1f0735fd2121c41a5127d42f6c69708 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Wed, 16 Mar 2022 17:17:41 -0400 Subject: [PATCH 06/17] Use WaitForReady instead of time.Sleep to detect client shutdown in TestRedial Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/proxy_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/go/vt/vtadmin/vtctldclient/proxy_test.go b/go/vt/vtadmin/vtctldclient/proxy_test.go index b96a872b860..899c8ec124f 100644 --- a/go/vt/vtadmin/vtctldclient/proxy_test.go +++ b/go/vt/vtadmin/vtctldclient/proxy_test.go @@ -151,8 +151,17 @@ func TestRedial(t *testing.T) { // Force an ungraceful shutdown of the gRPC server to which we're connected currentVtctld.Stop() - // FIXME use WaitForReady instead - time.Sleep(2 * time.Second) + // Wait for the client connection to shut down. If we redial too quickly, + // we get into a race condition with gRPC's internal retry logic. + // (Using WaitForReady here _does_ expose more function internals than is ideal for a unit test, + // but it's far less flaky than using time.Sleep.) + for { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err = proxy.VtctldClient.WaitForReady(ctx); err != nil { + break + } + } // Finally, check that dial + establish a connection to the remaining vtctld. err = proxy.Dial(context.Background()) From 1a4fcf9c895575b67b5993d5f727400d49bd40aa Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Thu, 17 Mar 2022 12:41:23 -0400 Subject: [PATCH 07/17] Fix TestDial by adding ConnectivityTimeout option Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/proxy_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/vtadmin/vtctldclient/proxy_test.go b/go/vt/vtadmin/vtctldclient/proxy_test.go index 899c8ec124f..b457ab598de 100644 --- a/go/vt/vtadmin/vtctldclient/proxy_test.go +++ b/go/vt/vtadmin/vtctldclient/proxy_test.go @@ -58,7 +58,8 @@ func TestDial(t *testing.T) { Id: "test", Name: "testcluster", }, - Discovery: disco, + Discovery: disco, + ConnectivityTimeout: 2 * time.Second, }) // We don't have a vtctld host until we call Dial From bed75c2f5471d91001ece638d9027459a43fe206 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Thu, 17 Mar 2022 12:45:13 -0400 Subject: [PATCH 08/17] Dedupe test logic with initVtctlServer helper Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/proxy_test.go | 33 ++++++++++++------------ 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/go/vt/vtadmin/vtctldclient/proxy_test.go b/go/vt/vtadmin/vtctldclient/proxy_test.go index b457ab598de..b1e67db6c83 100644 --- a/go/vt/vtadmin/vtctldclient/proxy_test.go +++ b/go/vt/vtadmin/vtctldclient/proxy_test.go @@ -35,16 +35,25 @@ type fakeVtctld struct { vtctlservicepb.VtctlServer } -func TestDial(t *testing.T) { +func initVtctlServer() (net.Listener, *grpc.Server, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - - defer listener.Close() + if err != nil { + return nil, nil, err + } vtctld := &fakeVtctld{} server := grpc.NewServer() vtctlservicepb.RegisterVtctlServer(server, vtctld) + return listener, server, err +} + +func TestDial(t *testing.T) { + listener, server, err := initVtctlServer() + require.NoError(t, err) + + defer listener.Close() + go server.Serve(listener) defer server.Stop() @@ -74,31 +83,23 @@ func TestDial(t *testing.T) { // a vtctld by rediscovering and redialing a new one. func TestRedial(t *testing.T) { // Initialize vtctld #1 - listener1, err := net.Listen("tcp", "127.0.0.1:0") + listener1, server1, err := initVtctlServer() require.NoError(t, err) - defer listener1.Close() - vtctld1 := &fakeVtctld{} - server1 := grpc.NewServer() + defer listener1.Close() go server1.Serve(listener1) defer server1.Stop() - vtctlservicepb.RegisterVtctlServer(server1, vtctld1) - // Initialize vtctld #2 - listener2, err := net.Listen("tcp", "127.0.0.1:0") + listener2, server2, err := initVtctlServer() require.NoError(t, err) - defer listener2.Close() - vtctld2 := &fakeVtctld{} - server2 := grpc.NewServer() + defer listener2.Close() go server2.Serve(listener2) defer server2.Stop() - vtctlservicepb.RegisterVtctlServer(server2, vtctld2) - // Register both vtctlds with VTAdmin disco := fakediscovery.New() disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ From 9fc1c2e5b0718348dad56e170d9a20f2cd6b1205 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Thu, 17 Mar 2022 15:36:53 -0400 Subject: [PATCH 09/17] Add WaitForReady tests in grpcvtctldclient/client_test.go Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtctl/grpcvtctldclient/client.go | 10 +++++-- go/vt/vtctl/grpcvtctldclient/client_test.go | 30 ++++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/go/vt/vtctl/grpcvtctldclient/client.go b/go/vt/vtctl/grpcvtctldclient/client.go index e55f08d81a2..b6236dbf179 100644 --- a/go/vt/vtctl/grpcvtctldclient/client.go +++ b/go/vt/vtctl/grpcvtctldclient/client.go @@ -20,6 +20,7 @@ package grpcvtctldclient import ( "context" + "errors" "fmt" "google.golang.org/grpc" @@ -32,6 +33,11 @@ import ( vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice" ) +var ( + ErrConnectionShutdown = errors.New("gRPCVtctldClient in a SHUTDOWN state") + ErrConnectionTimeout = errors.New("gRPC connection wait time exceeded") +) + const connClosedMsg = "grpc: the client connection is closed" type gRPCVtctldClient struct { @@ -93,7 +99,7 @@ func (client *gRPCVtctldClient) WaitForReady(ctx context.Context) error { // within the context timeout. The caller should close their // existing connection and establish a new one. case <-ctx.Done(): - return fmt.Errorf("gRPC connection wait time exceeded") + return ErrConnectionTimeout // Wait to transition to READY state default: @@ -108,7 +114,7 @@ func (client *gRPCVtctldClient) WaitForReady(ctx context.Context) error { // fail immediately. So, we don't need to waste time by continuing to poll and can // return an error immediately so that the caller can close the connection. case connectivity.Shutdown: - return fmt.Errorf("gRPCVtctldClient in a SHUTDOWN state") + return ErrConnectionShutdown // If the connection is IDLE, CONNECTING, or in a TRANSIENT_FAILURE mode, // then we wait to see if it will transition to a READY state. diff --git a/go/vt/vtctl/grpcvtctldclient/client_test.go b/go/vt/vtctl/grpcvtctldclient/client_test.go index 9f6654cc49d..f1a39d056ed 100644 --- a/go/vt/vtctl/grpcvtctldclient/client_test.go +++ b/go/vt/vtctl/grpcvtctldclient/client_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package grpcvtctldclient_test +package grpcvtctldclient import ( "context" @@ -35,6 +35,34 @@ import ( vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice" ) +func TestWaitForReady(t *testing.T) { + ts := memorytopo.NewServer("cell1") + vtctld := testutil.NewVtctldServerWithTabletManagerClient(t, ts, nil, func(ts *topo.Server) vtctlservicepb.VtctldServer { + return grpcvtctldserver.NewVtctldServer(ts) + }) + + testutil.WithTestServer(t, vtctld, func(t *testing.T, client vtctldclient.VtctldClient) { + ctx := context.Background() + err := client.WaitForReady(ctx) + assert.NoError(t, err) + }) +} + +func TestWaitForReadyShutdown(t *testing.T) { + ts := memorytopo.NewServer("cell1") + vtctld := testutil.NewVtctldServerWithTabletManagerClient(t, ts, nil, func(ts *topo.Server) vtctlservicepb.VtctldServer { + return grpcvtctldserver.NewVtctldServer(ts) + }) + + testutil.WithTestServer(t, vtctld, func(t *testing.T, client vtctldclient.VtctldClient) { + client.Close() + ctx := context.Background() + err := client.WaitForReady(ctx) + + assert.Error(t, ErrConnectionShutdown, err) + }) +} + func TestFindAllShardsInKeyspace(t *testing.T) { ctx := context.Background() ts := memorytopo.NewServer("cell1") From 9215e3996919c5acbbc0c536fed42f3492f60043 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Thu, 17 Mar 2022 15:43:18 -0400 Subject: [PATCH 10/17] Nits: wording + grammar Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/config.go | 2 +- go/vt/vtadmin/vtctldclient/proxy_test.go | 5 ++--- go/vt/vtctl/grpcvtctldclient/client.go | 6 +++--- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/go/vt/vtadmin/vtctldclient/config.go b/go/vt/vtadmin/vtctldclient/config.go index 3abbcdf21ad..5706cf48c34 100644 --- a/go/vt/vtadmin/vtctldclient/config.go +++ b/go/vt/vtadmin/vtctldclient/config.go @@ -64,7 +64,7 @@ func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string) func (c *Config) Parse(args []string) error { fs := pflag.NewFlagSet("", pflag.ContinueOnError) - fs.DurationVar(&c.ConnectivityTimeout, "grpc-connectivity-timeout", 2*time.Second, "The maximum duration to wait for a vtctld gRPC connection to be established.") + fs.DurationVar(&c.ConnectivityTimeout, "grpc-connectivity-timeout", 2*time.Second, "The maximum duration to wait for a gRPC connection to be established to the vtctld.") credentialsTmplStr := fs.String("credentials-path-tmpl", "", "Go template used to specify a path to a credentials file, which is a json file containing "+ diff --git a/go/vt/vtadmin/vtctldclient/proxy_test.go b/go/vt/vtadmin/vtctldclient/proxy_test.go index b1e67db6c83..a3484bc8022 100644 --- a/go/vt/vtadmin/vtctldclient/proxy_test.go +++ b/go/vt/vtadmin/vtctldclient/proxy_test.go @@ -144,7 +144,6 @@ func TestRedial(t *testing.T) { // Remove the shut down vtctld from VTAdmin's service discovery (clumsily). // Otherwise, when redialing, we may redial the vtctld that we just shut down. - // FIXME make this nicer disco.Clear() disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ Hostname: nextAddr, @@ -153,7 +152,7 @@ func TestRedial(t *testing.T) { // Force an ungraceful shutdown of the gRPC server to which we're connected currentVtctld.Stop() - // Wait for the client connection to shut down. If we redial too quickly, + // Wait for the client connection to shut down. (If we redial too quickly, // we get into a race condition with gRPC's internal retry logic. // (Using WaitForReady here _does_ expose more function internals than is ideal for a unit test, // but it's far less flaky than using time.Sleep.) @@ -165,7 +164,7 @@ func TestRedial(t *testing.T) { } } - // Finally, check that dial + establish a connection to the remaining vtctld. + // Finally, check that we discover, dial + establish a new connection to the remaining vtctld. err = proxy.Dial(context.Background()) assert.NoError(t, err) assert.Equal(t, nextAddr, proxy.host) diff --git a/go/vt/vtctl/grpcvtctldclient/client.go b/go/vt/vtctl/grpcvtctldclient/client.go index b6236dbf179..2dc57a546fd 100644 --- a/go/vt/vtctl/grpcvtctldclient/client.go +++ b/go/vt/vtctl/grpcvtctldclient/client.go @@ -110,9 +110,9 @@ func (client *gRPCVtctldClient) WaitForReady(ctx context.Context) error { return nil // Per https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md, - // a client that enters SHUTDOWN state never leaves this state, and all new RPCs should - // fail immediately. So, we don't need to waste time by continuing to poll and can - // return an error immediately so that the caller can close the connection. + // a client that enters the SHUTDOWN state never leaves this state, and all new RPCs should + // fail immediately. Further polling is futile, in other words, and so we + // return an error immediately to indicate that the caller can close the connection. case connectivity.Shutdown: return ErrConnectionShutdown From d8318d6bf33653733b31185a59ae08a75a7965e2 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Fri, 18 Mar 2022 13:20:25 -0400 Subject: [PATCH 11/17] Don't early return from Dial on Close errors Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/proxy.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/go/vt/vtadmin/vtctldclient/proxy.go b/go/vt/vtadmin/vtctldclient/proxy.go index 062acc753b9..8467d8a9558 100644 --- a/go/vt/vtadmin/vtctldclient/proxy.go +++ b/go/vt/vtadmin/vtctldclient/proxy.go @@ -129,7 +129,10 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { // close before reopen. this is safe to call on an already-closed client. if err := vtctld.Close(); err != nil { - return fmt.Errorf("error closing possibly-stale connection before re-dialing: %w", err) + // Even if the client connection does not shut down cleanly, we don't want to block + // Dial from discovering a new vtctld. This makes VTAdmin's dialer more resilient, + // but, as a caveat, it _can_ potentially leak improperly-closed gRPC connections. + log.Errorf("error closing possibly-stale connection before re-dialing: %w", err) } } @@ -190,12 +193,16 @@ func (vtctld *ClientProxy) Close() error { } err := vtctld.VtctldClient.Close() + + // Mark the vtctld connection as "closed" from the proxy side even if + // the client connection does not shut down cleanly. This makes VTAdmin's dialer more resilient, + // but, as a caveat, it _can_ potentially leak improperly-closed gRPC connections. + vtctld.closed = true + if err != nil { return err } - vtctld.closed = true - return nil } From 5742703aa7eabe3b88ac34c424ea59c4195066c0 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Fri, 18 Mar 2022 14:00:52 -0400 Subject: [PATCH 12/17] Import the correct logging framework >:( Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtadmin/vtctldclient/proxy.go b/go/vt/vtadmin/vtctldclient/proxy.go index 8467d8a9558..ffbacae59ab 100644 --- a/go/vt/vtadmin/vtctldclient/proxy.go +++ b/go/vt/vtadmin/vtctldclient/proxy.go @@ -26,7 +26,7 @@ import ( "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/orchestrator/external/golib/log" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vtadmin/cluster/discovery" "vitess.io/vitess/go/vt/vtadmin/debug" "vitess.io/vitess/go/vt/vtadmin/vtadminproto" From 0ae0c1a60998ca788093363aa58a47ea7d691cc6 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Mon, 21 Mar 2022 09:23:40 -0400 Subject: [PATCH 13/17] Remove extraneous log statements Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/proxy.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/go/vt/vtadmin/vtctldclient/proxy.go b/go/vt/vtadmin/vtctldclient/proxy.go index ffbacae59ab..10798255a17 100644 --- a/go/vt/vtadmin/vtctldclient/proxy.go +++ b/go/vt/vtadmin/vtctldclient/proxy.go @@ -110,8 +110,6 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { if err := vtctld.VtctldClient.WaitForReady(waitCtx); err == nil { // Our cached connection is still open and ready, so we're good to go. - log.Infof("Using cached connection to vtctld %s\n", vtctld.host) - span.Annotate("is_noop", true) span.Annotate("vtctld_host", vtctld.host) @@ -124,7 +122,6 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { // discover a new vtctld, and establish a new connection. } - log.Infof("Closing stale connection to vtctld %s\n", vtctld.host) span.Annotate("is_stale", true) // close before reopen. this is safe to call on an already-closed client. @@ -136,7 +133,6 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { } } - log.Infof("Discovering vtctld to dial...\n") addr, err := vtctld.discovery.DiscoverVtctldAddr(ctx, nil) if err != nil { return fmt.Errorf("error discovering vtctld to dial: %w", err) @@ -156,13 +152,11 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { opts = append(opts, grpc.WithPerRPCCredentials(vtctld.creds)) } - log.Infof("Discovered vtctld %s; attempting to establish gRPC connection...\n", addr) client, err := vtctld.DialFunc(addr, grpcclient.FailFast(false), opts...) if err != nil { return err } - log.Infof("Established gRPC connection to vtctld %s; waiting to transition to READY...\n", addr) waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout) defer waitCancel() From fef9e982303f33f007aff1cf1de236d58690ae16 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Mon, 21 Mar 2022 11:02:54 -0400 Subject: [PATCH 14/17] Add defaultConnectivityTimeout var for use in unit tests Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/config.go | 4 +++- go/vt/vtadmin/vtctldclient/config_test.go | 16 +++++++++------- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/go/vt/vtadmin/vtctldclient/config.go b/go/vt/vtadmin/vtctldclient/config.go index 5706cf48c34..1c9ff6af909 100644 --- a/go/vt/vtadmin/vtctldclient/config.go +++ b/go/vt/vtadmin/vtctldclient/config.go @@ -41,6 +41,8 @@ type Config struct { ConnectivityTimeout time.Duration } +var defaultConnectivityTimeout = 2 * time.Second + // Parse returns a new config with the given cluster and discovery, after // attempting to parse the command-line pflags into that Config. See // (*Config).Parse() for more details. @@ -64,7 +66,7 @@ func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string) func (c *Config) Parse(args []string) error { fs := pflag.NewFlagSet("", pflag.ContinueOnError) - fs.DurationVar(&c.ConnectivityTimeout, "grpc-connectivity-timeout", 2*time.Second, "The maximum duration to wait for a gRPC connection to be established to the vtctld.") + fs.DurationVar(&c.ConnectivityTimeout, "grpc-connectivity-timeout", defaultConnectivityTimeout, "The maximum duration to wait for a gRPC connection to be established to the vtctld.") credentialsTmplStr := fs.String("credentials-path-tmpl", "", "Go template used to specify a path to a credentials file, which is a json file containing "+ diff --git a/go/vt/vtadmin/vtctldclient/config_test.go b/go/vt/vtadmin/vtctldclient/config_test.go index 6ec50bc98dd..e90634b0072 100644 --- a/go/vt/vtadmin/vtctldclient/config_test.go +++ b/go/vt/vtadmin/vtctldclient/config_test.go @@ -50,10 +50,11 @@ func TestParse(t *testing.T) { require.NoError(t, err) expected := &Config{ - Cluster: nil, - Discovery: nil, - Credentials: nil, - CredentialsPath: "", + Cluster: nil, + Discovery: nil, + Credentials: nil, + CredentialsPath: "", + ConnectivityTimeout: defaultConnectivityTimeout, } assert.Equal(t, expected, cfg) }) @@ -88,9 +89,10 @@ func TestParse(t *testing.T) { Cluster: &vtadminpb.Cluster{ Name: "testcluster", }, - Discovery: nil, - Credentials: creds, - CredentialsPath: credsfile.Name(), + Discovery: nil, + Credentials: creds, + CredentialsPath: credsfile.Name(), + ConnectivityTimeout: defaultConnectivityTimeout, } assert.Equal(t, expected, cfg) From 5c8d16056d9c61bbaafc36ec26da7db4fabd9c8d Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Mon, 21 Mar 2022 11:07:31 -0400 Subject: [PATCH 15/17] Add WaitForReady to fakevtctldclient Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go b/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go index c767cf797ff..d1471a4c5c5 100644 --- a/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go +++ b/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go @@ -71,6 +71,8 @@ type VtctldClient struct { // incorrectly. var _ vtctldclient.VtctldClient = (*VtctldClient)(nil) +func (fake *VtctldClient) WaitForReady(ctx context.Context) error { return nil } + // CreateKeyspace is part of the vtctldclient.VtctldClient interface. func (fake *VtctldClient) CreateKeyspace(ctx context.Context, req *vtctldatapb.CreateKeyspaceRequest, opts ...grpc.CallOption) (*vtctldatapb.CreateKeyspaceResponse, error) { if fake.CreateKeyspaceShouldErr { From afe15d9e076edd9569134ea44a97e1337567e245 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Mon, 21 Mar 2022 11:40:32 -0400 Subject: [PATCH 16/17] Use defaultConnectivityTimeout in proxy_test Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/proxy_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtadmin/vtctldclient/proxy_test.go b/go/vt/vtadmin/vtctldclient/proxy_test.go index a3484bc8022..3ad94e09d21 100644 --- a/go/vt/vtadmin/vtctldclient/proxy_test.go +++ b/go/vt/vtadmin/vtctldclient/proxy_test.go @@ -68,7 +68,7 @@ func TestDial(t *testing.T) { Name: "testcluster", }, Discovery: disco, - ConnectivityTimeout: 2 * time.Second, + ConnectivityTimeout: defaultConnectivityTimeout, }) // We don't have a vtctld host until we call Dial @@ -114,7 +114,7 @@ func TestRedial(t *testing.T) { Name: "testcluster", }, Discovery: disco, - ConnectivityTimeout: 2 * time.Second, + ConnectivityTimeout: defaultConnectivityTimeout, }) // We don't have a vtctld host until we call Dial From 92e7055970aa1d31cea0d5cfd3d15b11c3c5b866 Mon Sep 17 00:00:00 2001 From: Sara Bee <855595+doeg@users.noreply.github.com> Date: Tue, 22 Mar 2022 09:25:23 -0400 Subject: [PATCH 17/17] var defaultConnectivityTimeout -> const Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com> --- go/vt/vtadmin/vtctldclient/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtadmin/vtctldclient/config.go b/go/vt/vtadmin/vtctldclient/config.go index 1c9ff6af909..63f96f1a606 100644 --- a/go/vt/vtadmin/vtctldclient/config.go +++ b/go/vt/vtadmin/vtctldclient/config.go @@ -41,7 +41,7 @@ type Config struct { ConnectivityTimeout time.Duration } -var defaultConnectivityTimeout = 2 * time.Second +const defaultConnectivityTimeout = 2 * time.Second // Parse returns a new config with the given cluster and discovery, after // attempting to parse the command-line pflags into that Config. See