From c046196c245990c9b258e96c2fa4641abbd56c43 Mon Sep 17 00:00:00 2001 From: Alain Jobart Date: Wed, 15 Jul 2015 12:53:52 -0700 Subject: [PATCH] Changing vtctl and vtworker remote API to also return an error. that way we don't depend on calling errFunc() *before* it actually should be called. --- .../fake_loggerevent_streamingclient.go | 10 +++------- .../fake_loggerevent_streamingclient_test.go | 10 +++++----- .../vtctl/fakevtctlclient/fakevtctlclient.go | 2 +- go/vt/vtctl/gorpcvtctlclient/client.go | 4 ++-- go/vt/vtctl/grpcvtctlclient/client.go | 6 +++--- go/vt/vtctl/vtctlclient/interface.go | 2 +- go/vt/vtctl/vtctlclient/wrapper.go | 4 ++-- go/vt/vtctl/vtctlclienttest/client.go | 15 ++++++++++++--- .../fakevtworkerclient/fakevtworkerclient.go | 2 +- go/vt/worker/grpcvtworkerclient/client.go | 6 +++--- .../worker/vtworkerclient/client_testsuite.go | 19 +++++++++++-------- go/vt/worker/vtworkerclient/interface.go | 2 +- go/vt/worker/vtworkerclient/wrapper.go | 4 ++-- go/vt/wrangler/testlib/vtctl_pipe.go | 5 ++++- 14 files changed, 51 insertions(+), 40 deletions(-) diff --git a/go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient.go b/go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient.go index 1443d7fd16a..77cf4d33d99 100644 --- a/go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient.go +++ b/go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient.go @@ -50,10 +50,10 @@ func (f *FakeLoggerEventStreamingClient) RegisterResult(args []string, output st } // StreamResult returns a channel which streams back a registered result as logging events. -func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (<-chan *logutil.LoggerEvent, func() error) { +func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (<-chan *logutil.LoggerEvent, func() error, error) { result, ok := f.results[fromSlice(args)] if !ok { - return nil, func() error { return fmt.Errorf("No response was registered for args: %v", args) } + return nil, nil, fmt.Errorf("No response was registered for args: %v", args) } stream := make(chan *logutil.LoggerEvent) @@ -71,9 +71,5 @@ func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (<-chan *lo close(stream) }() - if result.err != nil { - return stream, func() error { return result.err } - } - - return stream, func() error { return nil } + return stream, func() error { return result.err }, nil } diff --git a/go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient_test.go b/go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient_test.go index a1c181a0521..63a5fe7ab18 100644 --- a/go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient_test.go +++ b/go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient_test.go @@ -29,7 +29,7 @@ func verifyStreamOutputAndError(t *testing.T, wantErr error) { t.Fatal(err) } - stream, errFunc := fake.StreamResult(args) + stream, errFunc, err := fake.StreamResult(args) // Verify output and error. i := 0 @@ -53,16 +53,16 @@ func verifyStreamOutputAndError(t *testing.T, wantErr error) { func TestNoResultRegistered(t *testing.T) { fake := NewFakeLoggerEventStreamingClient() - stream, errFunc := fake.StreamResult([]string{"ListShardTablets", "test_keyspace/0"}) + stream, errFunc, err := fake.StreamResult([]string{"ListShardTablets", "test_keyspace/0"}) if stream != nil { t.Fatalf("No stream should have been returned because no matching result is registered.") } - if errFunc() == nil { + if errFunc != nil { t.Fatalf("Executing the command should fail because no matching result is registered.") } wantErr := "No response was registered for args: [ListShardTablets test_keyspace/0]" - if errFunc().Error() != wantErr { - t.Errorf("Wrong error for missing result was returned. got: '%v' want: '%v'", errFunc(), wantErr) + if err.Error() != wantErr { + t.Errorf("Wrong error for missing result was returned. got: '%v' want: '%v'", err, wantErr) } } diff --git a/go/vt/vtctl/fakevtctlclient/fakevtctlclient.go b/go/vt/vtctl/fakevtctlclient/fakevtctlclient.go index 5d27394e19d..16950a97258 100644 --- a/go/vt/vtctl/fakevtctlclient/fakevtctlclient.go +++ b/go/vt/vtctl/fakevtctlclient/fakevtctlclient.go @@ -31,7 +31,7 @@ func (f *FakeVtctlClient) FakeVtctlClientFactory(addr string, dialTimeout time.D } // ExecuteVtctlCommand is part of the vtctlclient interface. -func (f *FakeVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc) { +func (f *FakeVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) { return f.FakeLoggerEventStreamingClient.StreamResult(args) } diff --git a/go/vt/vtctl/gorpcvtctlclient/client.go b/go/vt/vtctl/gorpcvtctlclient/client.go index 3ab0d711f3e..d805d38abe9 100644 --- a/go/vt/vtctl/gorpcvtctlclient/client.go +++ b/go/vt/vtctl/gorpcvtctlclient/client.go @@ -34,7 +34,7 @@ func goRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclien // ExecuteVtctlCommand is part of the VtctlClient interface. // Note the bson rpc version doesn't honor timeouts in the context // (but the server side will honor the actionTimeout) -func (client *goRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc) { +func (client *goRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) { req := &gorpcproto.ExecuteVtctlCommandArgs{ Args: args, ActionTimeout: actionTimeout, @@ -42,7 +42,7 @@ func (client *goRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args [] } sr := make(chan *logutil.LoggerEvent, 10) c := client.rpcClient.StreamGo("VtctlServer.ExecuteVtctlCommand", req, sr) - return sr, func() error { return c.Error } + return sr, func() error { return c.Error }, nil } // Close is part of the VtctlClient interface diff --git a/go/vt/vtctl/grpcvtctlclient/client.go b/go/vt/vtctl/grpcvtctlclient/client.go index 8eb26b8f0f5..85e2a180414 100644 --- a/go/vt/vtctl/grpcvtctlclient/client.go +++ b/go/vt/vtctl/grpcvtctlclient/client.go @@ -38,7 +38,7 @@ func gRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclient } // ExecuteVtctlCommand is part of the VtctlClient interface -func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc) { +func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) { query := &pb.ExecuteVtctlCommandRequest{ Args: args, ActionTimeout: int64(actionTimeout.Nanoseconds()), @@ -47,7 +47,7 @@ func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []s stream, err := client.c.ExecuteVtctlCommand(ctx, query) if err != nil { - return nil, func() error { return err } + return nil, nil, err } results := make(chan *logutil.LoggerEvent, 1) @@ -67,7 +67,7 @@ func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []s }() return results, func() error { return finalError - } + }, nil } // Close is part of the VtctlClient interface diff --git a/go/vt/vtctl/vtctlclient/interface.go b/go/vt/vtctl/vtctlclient/interface.go index 2671c6e630d..67766008f1c 100644 --- a/go/vt/vtctl/vtctlclient/interface.go +++ b/go/vt/vtctl/vtctlclient/interface.go @@ -25,7 +25,7 @@ type ErrFunc func() error type VtctlClient interface { // ExecuteVtctlCommand will execute the command remotely // NOTE: ErrFunc should only be checked after the returned channel was closed to avoid races. - ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, ErrFunc) + ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, ErrFunc, error) // Close will terminate the connection. This object won't be // used after this. diff --git a/go/vt/vtctl/vtctlclient/wrapper.go b/go/vt/vtctl/vtctlclient/wrapper.go index a57f3997d56..7ae51e23e16 100644 --- a/go/vt/vtctl/vtctlclient/wrapper.go +++ b/go/vt/vtctl/vtctlclient/wrapper.go @@ -29,8 +29,8 @@ func RunCommandAndWait(ctx context.Context, server string, args []string, dialTi // run the command ctx, cancel := context.WithTimeout(context.Background(), actionTimeout) defer cancel() - c, errFunc := client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockWaitTimeout) - if err = errFunc(); err != nil { + c, errFunc, err := client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockWaitTimeout) + if err != nil { return fmt.Errorf("Cannot execute remote command: %v", err) } diff --git a/go/vt/vtctl/vtctlclienttest/client.go b/go/vt/vtctl/vtctlclienttest/client.go index 01d4f313f3f..2d57047ef9f 100644 --- a/go/vt/vtctl/vtctlclienttest/client.go +++ b/go/vt/vtctl/vtctlclienttest/client.go @@ -55,7 +55,10 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) { } // run a command that's gonna return something on the log channel - logs, errFunc := client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell1"}, 30*time.Second, 10*time.Second) + logs, errFunc, err := client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell1"}, 30*time.Second, 10*time.Second) + if err != nil { + t.Fatalf("Remote error: %v", err) + } count := 0 for e := range logs { expected := "cell1-0000000001 test_keyspace master localhost:3333 localhost:3334 [tag: \"value\"]\n" @@ -73,7 +76,10 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) { } // run a command that's gonna fail - logs, errFunc = client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell2"}, 30*time.Second, 10*time.Second) + logs, errFunc, err = client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell2"}, 30*time.Second, 10*time.Second) + if err != nil { + t.Fatalf("Remote error: %v", err) + } if e, ok := <-logs; ok { t.Errorf("Got unexpected line for logs: %v", e.String()) } @@ -84,7 +90,10 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) { } // run a command that's gonna panic - logs, errFunc = client.ExecuteVtctlCommand(ctx, []string{"Panic"}, 30*time.Second, 10*time.Second) + logs, errFunc, err = client.ExecuteVtctlCommand(ctx, []string{"Panic"}, 30*time.Second, 10*time.Second) + if err != nil { + t.Fatalf("Remote error: %v", err) + } if e, ok := <-logs; ok { t.Errorf("Got unexpected line for logs: %v", e.String()) } diff --git a/go/vt/worker/fakevtworkerclient/fakevtworkerclient.go b/go/vt/worker/fakevtworkerclient/fakevtworkerclient.go index f828592286b..4973688075c 100644 --- a/go/vt/worker/fakevtworkerclient/fakevtworkerclient.go +++ b/go/vt/worker/fakevtworkerclient/fakevtworkerclient.go @@ -32,7 +32,7 @@ func (f *FakeVtworkerClient) FakeVtworkerClientFactory(addr string, dialTimeout } // ExecuteVtworkerCommand is part of the vtworkerclient interface. -func (f *FakeVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc) { +func (f *FakeVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc, error) { return f.FakeLoggerEventStreamingClient.StreamResult(args) } diff --git a/go/vt/worker/grpcvtworkerclient/client.go b/go/vt/worker/grpcvtworkerclient/client.go index 5c1742e830b..45cdf23c194 100644 --- a/go/vt/worker/grpcvtworkerclient/client.go +++ b/go/vt/worker/grpcvtworkerclient/client.go @@ -38,14 +38,14 @@ func gRPCVtworkerClientFactory(addr string, dialTimeout time.Duration) (vtworker } // ExecuteVtworkerCommand is part of the VtworkerClient interface. -func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc) { +func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc, error) { query := &pb.ExecuteVtworkerCommandRequest{ Args: args, } stream, err := client.c.ExecuteVtworkerCommand(ctx, query) if err != nil { - return nil, func() error { return err } + return nil, nil, err } results := make(chan *logutil.LoggerEvent, 1) @@ -71,7 +71,7 @@ func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, ar }() return results, func() error { return finalError - } + }, nil } // Close is part of the VtworkerClient interface. diff --git a/go/vt/worker/vtworkerclient/client_testsuite.go b/go/vt/worker/vtworkerclient/client_testsuite.go index 398c6889cc9..5222db0160d 100644 --- a/go/vt/worker/vtworkerclient/client_testsuite.go +++ b/go/vt/worker/vtworkerclient/client_testsuite.go @@ -43,8 +43,8 @@ func TestSuite(t *testing.T, wi *worker.Instance, c VtworkerClient) { } func commandSucceeds(t *testing.T, client VtworkerClient) { - logs, errFunc := client.ExecuteVtworkerCommand(context.Background(), []string{"Ping", "pong"}) - if err := errFunc(); err != nil { + logs, errFunc, err := client.ExecuteVtworkerCommand(context.Background(), []string{"Ping", "pong"}) + if err != nil { t.Fatalf("Cannot execute remote command: %v", err) } @@ -64,15 +64,19 @@ func commandSucceeds(t *testing.T, client VtworkerClient) { t.Fatalf("Remote error: %v", err) } - _, errFuncReset := client.ExecuteVtworkerCommand(context.Background(), []string{"Reset"}) - if err := errFuncReset(); err != nil { + logs, errFunc, err = client.ExecuteVtworkerCommand(context.Background(), []string{"Reset"}) + if err != nil { + t.Fatalf("Cannot execute remote command: %v", err) + } + for _ = range logs { + } + if err := errFunc(); err != nil { t.Fatalf("Cannot execute remote command: %v", err) } } func commandErrors(t *testing.T, client VtworkerClient) { - logs, errFunc := client.ExecuteVtworkerCommand(context.Background(), []string{"NonexistingCommand"}) - err := errFunc() + logs, errFunc, err := client.ExecuteVtworkerCommand(context.Background(), []string{"NonexistingCommand"}) // The expected error could already be seen now or after the output channel is closed. // To avoid checking for the same error twice, we don't check it here yet. @@ -96,8 +100,7 @@ func commandErrors(t *testing.T, client VtworkerClient) { } func commandPanics(t *testing.T, client VtworkerClient) { - logs, errFunc := client.ExecuteVtworkerCommand(context.Background(), []string{"Panic"}) - err := errFunc() + logs, errFunc, err := client.ExecuteVtworkerCommand(context.Background(), []string{"Panic"}) // The expected error could already be seen now or after the output channel is closed. // To avoid checking for the same error twice, we don't check it here yet. diff --git a/go/vt/worker/vtworkerclient/interface.go b/go/vt/worker/vtworkerclient/interface.go index ccb0982c387..3ee50a75da4 100644 --- a/go/vt/worker/vtworkerclient/interface.go +++ b/go/vt/worker/vtworkerclient/interface.go @@ -25,7 +25,7 @@ type ErrFunc func() error type VtworkerClient interface { // ExecuteVtworkerCommand will execute the command remotely. // NOTE: ErrFunc should only be checked after the returned channel was closed to avoid races. - ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, ErrFunc) + ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, ErrFunc, error) // Close will terminate the connection. This object won't be // used after this. diff --git a/go/vt/worker/vtworkerclient/wrapper.go b/go/vt/worker/vtworkerclient/wrapper.go index 8fa0eb46a77..c7cd3b33950 100644 --- a/go/vt/worker/vtworkerclient/wrapper.go +++ b/go/vt/worker/vtworkerclient/wrapper.go @@ -28,8 +28,8 @@ func RunCommandAndWait(ctx context.Context, server string, args []string, recv f defer client.Close() // run the command - c, errFunc := client.ExecuteVtworkerCommand(ctx, args) - if err = errFunc(); err != nil { + c, errFunc, err := client.ExecuteVtworkerCommand(ctx, args) + if err != nil { return fmt.Errorf("Cannot execute remote command: %v", err) } diff --git a/go/vt/wrangler/testlib/vtctl_pipe.go b/go/vt/wrangler/testlib/vtctl_pipe.go index cf2269c3808..fe4725a170c 100644 --- a/go/vt/wrangler/testlib/vtctl_pipe.go +++ b/go/vt/wrangler/testlib/vtctl_pipe.go @@ -73,7 +73,10 @@ func (vp *VtctlPipe) Run(args []string) error { lockTimeout := 10 * time.Second ctx := context.Background() - c, errFunc := vp.client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockTimeout) + c, errFunc, err := vp.client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockTimeout) + if err != nil { + return err + } for le := range c { vp.t.Logf(le.String()) }