Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions go/vt/vtctl/fakevtctlclient/fake_loggerevent_streamingclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ func (f *FakeLoggerEventStreamingClient) RegisterResult(args []string, output st
}

// StreamResult returns a channel which streams back a registered result as logging events.
func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (<-chan *logutil.LoggerEvent, func() error) {
func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (<-chan *logutil.LoggerEvent, func() error, error) {
result, ok := f.results[fromSlice(args)]
if !ok {
return nil, func() error { return fmt.Errorf("No response was registered for args: %v", args) }
return nil, nil, fmt.Errorf("No response was registered for args: %v", args)
}

stream := make(chan *logutil.LoggerEvent)
Expand All @@ -71,9 +71,5 @@ func (f *FakeLoggerEventStreamingClient) StreamResult(args []string) (<-chan *lo
close(stream)
}()

if result.err != nil {
return stream, func() error { return result.err }
}

return stream, func() error { return nil }
return stream, func() error { return result.err }, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func verifyStreamOutputAndError(t *testing.T, wantErr error) {
t.Fatal(err)
}

stream, errFunc := fake.StreamResult(args)
stream, errFunc, err := fake.StreamResult(args)

// Verify output and error.
i := 0
Expand All @@ -53,16 +53,16 @@ func verifyStreamOutputAndError(t *testing.T, wantErr error) {

func TestNoResultRegistered(t *testing.T) {
fake := NewFakeLoggerEventStreamingClient()
stream, errFunc := fake.StreamResult([]string{"ListShardTablets", "test_keyspace/0"})
stream, errFunc, err := fake.StreamResult([]string{"ListShardTablets", "test_keyspace/0"})
if stream != nil {
t.Fatalf("No stream should have been returned because no matching result is registered.")
}
if errFunc() == nil {
if errFunc != nil {
t.Fatalf("Executing the command should fail because no matching result is registered.")
}
wantErr := "No response was registered for args: [ListShardTablets test_keyspace/0]"
if errFunc().Error() != wantErr {
t.Errorf("Wrong error for missing result was returned. got: '%v' want: '%v'", errFunc(), wantErr)
if err.Error() != wantErr {
t.Errorf("Wrong error for missing result was returned. got: '%v' want: '%v'", err, wantErr)
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/fakevtctlclient/fakevtctlclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (f *FakeVtctlClient) FakeVtctlClientFactory(addr string, dialTimeout time.D
}

// ExecuteVtctlCommand is part of the vtctlclient interface.
func (f *FakeVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc) {
func (f *FakeVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
return f.FakeLoggerEventStreamingClient.StreamResult(args)
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/gorpcvtctlclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ func goRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclien
// ExecuteVtctlCommand is part of the VtctlClient interface.
// Note the bson rpc version doesn't honor timeouts in the context
// (but the server side will honor the actionTimeout)
func (client *goRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc) {
func (client *goRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
req := &gorpcproto.ExecuteVtctlCommandArgs{
Args: args,
ActionTimeout: actionTimeout,
LockTimeout: lockTimeout,
}
sr := make(chan *logutil.LoggerEvent, 10)
c := client.rpcClient.StreamGo("VtctlServer.ExecuteVtctlCommand", req, sr)
return sr, func() error { return c.Error }
return sr, func() error { return c.Error }, nil
}

// Close is part of the VtctlClient interface
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtctl/grpcvtctlclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func gRPCVtctlClientFactory(addr string, dialTimeout time.Duration) (vtctlclient
}

// ExecuteVtctlCommand is part of the VtctlClient interface
func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc) {
func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, vtctlclient.ErrFunc, error) {
query := &pb.ExecuteVtctlCommandRequest{
Args: args,
ActionTimeout: int64(actionTimeout.Nanoseconds()),
Expand All @@ -47,7 +47,7 @@ func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []s

stream, err := client.c.ExecuteVtctlCommand(ctx, query)
if err != nil {
return nil, func() error { return err }
return nil, nil, err
}

results := make(chan *logutil.LoggerEvent, 1)
Expand All @@ -67,7 +67,7 @@ func (client *gRPCVtctlClient) ExecuteVtctlCommand(ctx context.Context, args []s
}()
return results, func() error {
return finalError
}
}, nil
}

// Close is part of the VtctlClient interface
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/vtctlclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ErrFunc func() error
type VtctlClient interface {
// ExecuteVtctlCommand will execute the command remotely
// NOTE: ErrFunc should only be checked after the returned channel was closed to avoid races.
ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, ErrFunc)
ExecuteVtctlCommand(ctx context.Context, args []string, actionTimeout, lockTimeout time.Duration) (<-chan *logutil.LoggerEvent, ErrFunc, error)

// Close will terminate the connection. This object won't be
// used after this.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/vtctlclient/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ func RunCommandAndWait(ctx context.Context, server string, args []string, dialTi
// run the command
ctx, cancel := context.WithTimeout(context.Background(), actionTimeout)
defer cancel()
c, errFunc := client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockWaitTimeout)
if err = errFunc(); err != nil {
c, errFunc, err := client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockWaitTimeout)
if err != nil {
return fmt.Errorf("Cannot execute remote command: %v", err)
}

Expand Down
15 changes: 12 additions & 3 deletions go/vt/vtctl/vtctlclienttest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) {
}

// run a command that's gonna return something on the log channel
logs, errFunc := client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell1"}, 30*time.Second, 10*time.Second)
logs, errFunc, err := client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell1"}, 30*time.Second, 10*time.Second)
if err != nil {
t.Fatalf("Remote error: %v", err)
}
count := 0
for e := range logs {
expected := "cell1-0000000001 test_keyspace <null> master localhost:3333 localhost:3334 [tag: \"value\"]\n"
Expand All @@ -73,7 +76,10 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) {
}

// run a command that's gonna fail
logs, errFunc = client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell2"}, 30*time.Second, 10*time.Second)
logs, errFunc, err = client.ExecuteVtctlCommand(ctx, []string{"ListAllTablets", "cell2"}, 30*time.Second, 10*time.Second)
if err != nil {
t.Fatalf("Remote error: %v", err)
}
if e, ok := <-logs; ok {
t.Errorf("Got unexpected line for logs: %v", e.String())
}
Expand All @@ -84,7 +90,10 @@ func TestSuite(t *testing.T, ts topo.Server, client vtctlclient.VtctlClient) {
}

// run a command that's gonna panic
logs, errFunc = client.ExecuteVtctlCommand(ctx, []string{"Panic"}, 30*time.Second, 10*time.Second)
logs, errFunc, err = client.ExecuteVtctlCommand(ctx, []string{"Panic"}, 30*time.Second, 10*time.Second)
if err != nil {
t.Fatalf("Remote error: %v", err)
}
if e, ok := <-logs; ok {
t.Errorf("Got unexpected line for logs: %v", e.String())
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/fakevtworkerclient/fakevtworkerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (f *FakeVtworkerClient) FakeVtworkerClientFactory(addr string, dialTimeout
}

// ExecuteVtworkerCommand is part of the vtworkerclient interface.
func (f *FakeVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc) {
func (f *FakeVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc, error) {
return f.FakeLoggerEventStreamingClient.StreamResult(args)
}

Expand Down
6 changes: 3 additions & 3 deletions go/vt/worker/grpcvtworkerclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ func gRPCVtworkerClientFactory(addr string, dialTimeout time.Duration) (vtworker
}

// ExecuteVtworkerCommand is part of the VtworkerClient interface.
func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc) {
func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, vtworkerclient.ErrFunc, error) {
query := &pb.ExecuteVtworkerCommandRequest{
Args: args,
}

stream, err := client.c.ExecuteVtworkerCommand(ctx, query)
if err != nil {
return nil, func() error { return err }
return nil, nil, err
}

results := make(chan *logutil.LoggerEvent, 1)
Expand All @@ -71,7 +71,7 @@ func (client *gRPCVtworkerClient) ExecuteVtworkerCommand(ctx context.Context, ar
}()
return results, func() error {
return finalError
}
}, nil
}

// Close is part of the VtworkerClient interface.
Expand Down
19 changes: 11 additions & 8 deletions go/vt/worker/vtworkerclient/client_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func TestSuite(t *testing.T, wi *worker.Instance, c VtworkerClient) {
}

func commandSucceeds(t *testing.T, client VtworkerClient) {
logs, errFunc := client.ExecuteVtworkerCommand(context.Background(), []string{"Ping", "pong"})
if err := errFunc(); err != nil {
logs, errFunc, err := client.ExecuteVtworkerCommand(context.Background(), []string{"Ping", "pong"})
if err != nil {
t.Fatalf("Cannot execute remote command: %v", err)
}

Expand All @@ -64,15 +64,19 @@ func commandSucceeds(t *testing.T, client VtworkerClient) {
t.Fatalf("Remote error: %v", err)
}

_, errFuncReset := client.ExecuteVtworkerCommand(context.Background(), []string{"Reset"})
if err := errFuncReset(); err != nil {
logs, errFunc, err = client.ExecuteVtworkerCommand(context.Background(), []string{"Reset"})
if err != nil {
t.Fatalf("Cannot execute remote command: %v", err)
}
for _ = range logs {
}
if err := errFunc(); err != nil {
t.Fatalf("Cannot execute remote command: %v", err)
}
}

func commandErrors(t *testing.T, client VtworkerClient) {
logs, errFunc := client.ExecuteVtworkerCommand(context.Background(), []string{"NonexistingCommand"})
err := errFunc()
logs, errFunc, err := client.ExecuteVtworkerCommand(context.Background(), []string{"NonexistingCommand"})
// The expected error could already be seen now or after the output channel is closed.
// To avoid checking for the same error twice, we don't check it here yet.

Expand All @@ -96,8 +100,7 @@ func commandErrors(t *testing.T, client VtworkerClient) {
}

func commandPanics(t *testing.T, client VtworkerClient) {
logs, errFunc := client.ExecuteVtworkerCommand(context.Background(), []string{"Panic"})
err := errFunc()
logs, errFunc, err := client.ExecuteVtworkerCommand(context.Background(), []string{"Panic"})
// The expected error could already be seen now or after the output channel is closed.
// To avoid checking for the same error twice, we don't check it here yet.

Expand Down
2 changes: 1 addition & 1 deletion go/vt/worker/vtworkerclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ErrFunc func() error
type VtworkerClient interface {
// ExecuteVtworkerCommand will execute the command remotely.
// NOTE: ErrFunc should only be checked after the returned channel was closed to avoid races.
ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, ErrFunc)
ExecuteVtworkerCommand(ctx context.Context, args []string) (<-chan *logutil.LoggerEvent, ErrFunc, error)

// Close will terminate the connection. This object won't be
// used after this.
Expand Down
4 changes: 2 additions & 2 deletions go/vt/worker/vtworkerclient/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func RunCommandAndWait(ctx context.Context, server string, args []string, recv f
defer client.Close()

// run the command
c, errFunc := client.ExecuteVtworkerCommand(ctx, args)
if err = errFunc(); err != nil {
c, errFunc, err := client.ExecuteVtworkerCommand(ctx, args)
if err != nil {
return fmt.Errorf("Cannot execute remote command: %v", err)
}

Expand Down
5 changes: 4 additions & 1 deletion go/vt/wrangler/testlib/vtctl_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ func (vp *VtctlPipe) Run(args []string) error {
lockTimeout := 10 * time.Second
ctx := context.Background()

c, errFunc := vp.client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockTimeout)
c, errFunc, err := vp.client.ExecuteVtctlCommand(ctx, args, actionTimeout, lockTimeout)
if err != nil {
return err
}
for le := range c {
vp.t.Logf(le.String())
}
Expand Down