Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 50 additions & 9 deletions go/vt/worker/restartable_result_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type RestartableResultReader struct {
chunk chunk
// allowMultipleRetries is true if we are allowed to retry more than once.
allowMultipleRetries bool
// if we are running inside a transaction, this will hold a non-zero value
txID int64

query string

Expand Down Expand Up @@ -82,6 +84,15 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t
allowMultipleRetries: allowMultipleRetries,
}

err := tryToConnect(r)
if err != nil {
return nil, err
}
return r, nil
}

func tryToConnect(r *RestartableResultReader) error {

// If the initial connection fails we retry once.
// Note: The first retry will be the second attempt.
attempt := 0
Expand All @@ -97,15 +108,35 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t
err = fmt.Errorf("tablet=%v: %v", topoproto.TabletAliasString(r.tablet.Alias), err)
goto retry
}
return r, nil
return nil

retry:
if !retryable || attempt > 1 {
return nil, fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err)
return fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err)
}
statsRetryCount.Add(1)
logger.Infof("retrying after error: %v", err)
log.Infof("retrying after error: %v", err)
}

}

// NewTransactionalRestartableResultReader does the same thing that NewRestartableResultReader does,
// but works inside of a single transaction
func NewTransactionalRestartableResultReader(ctx context.Context, logger logutil.Logger, tp tabletProvider, td *tabletmanagerdatapb.TableDefinition, chunk chunk, allowMultipleRetries bool, txID int64) (*RestartableResultReader, error) {
r := &RestartableResultReader{
ctx: ctx,
logger: logger,
tp: tp,
td: td,
chunk: chunk,
allowMultipleRetries: allowMultipleRetries,
txID: txID,
}
err := tryToConnect(r)
if err != nil {
return nil, err
}
return r, nil
}

// getTablet (re)sets the tablet which is used for the streaming query.
Expand Down Expand Up @@ -145,11 +176,21 @@ func (r *RestartableResultReader) getTablet() (bool, error) {
func (r *RestartableResultReader) startStream() (bool, error) {
// Start the streaming query.
r.generateQuery()
stream := queryservice.ExecuteWithStreamer(r.ctx, r.conn, &querypb.Target{
Keyspace: r.tablet.Keyspace,
Shard: r.tablet.Shard,
TabletType: r.tablet.Type,
}, r.query, make(map[string]*querypb.BindVariable), nil)
var stream sqltypes.ResultStream

if r.txID == 0 {
stream = queryservice.ExecuteWithStreamer(r.ctx, r.conn, &querypb.Target{
Keyspace: r.tablet.Keyspace,
Shard: r.tablet.Shard,
TabletType: r.tablet.Type,
}, r.query, make(map[string]*querypb.BindVariable), nil)
} else {
stream = queryservice.ExecuteWithTransactionalStreamer(r.ctx, r.conn, &querypb.Target{
Keyspace: r.tablet.Keyspace,
Shard: r.tablet.Shard,
TabletType: r.tablet.Type,
}, r.query, make(map[string]*querypb.BindVariable), r.txID, nil)
}

// Read the fields information.
cols, err := stream.Recv()
Expand Down Expand Up @@ -387,4 +428,4 @@ func greaterThanTupleWhereClause(columns []string, row []sqltypes.Value) []strin
clauses = append(clauses, b.String())

return clauses
}
}
10 changes: 10 additions & 0 deletions go/vt/worker/result_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/vterrors"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -177,6 +178,15 @@ func (rm *ResultMerger) Next() (*sqltypes.Result, error) {
return result, nil
}

// Close closes all inputs
func (rm *ResultMerger) Close(ctx context.Context) {
for _, i := range rm.inputs {
i.Close(ctx)
}
}



func (rm *ResultMerger) deleteInput(deleteMe ResultReader) {
for i, input := range rm.inputs {
if input == deleteMe {
Expand Down
9 changes: 9 additions & 0 deletions go/vt/worker/result_merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -110,6 +111,10 @@ func (f *fakeResultReader) Fields() []*querypb.Field {
return f.fields
}

// Close closes nothing
func (f *fakeResultReader) Close(ctx context.Context) {
}

// Next returns the next fake result. It is part of the ResultReader interface.
func (f *fakeResultReader) Next() (*sqltypes.Result, error) {
if f.rowsReturned == f.rowsTotal {
Expand Down Expand Up @@ -387,6 +392,10 @@ func (m *memoryResultReader) Next() (*sqltypes.Result, error) {
return result, nil
}

func (m *memoryResultReader) Close(ctx context.Context) {
// intentionally blank. we have nothing we need to close
}

// benchmarkResult is a package level variable whose sole purpose is to
// reference output from the Benchmark* functions below.
// This was suggested by http://dave.cheney.net/2013/06/30/how-to-write-benchmarks-in-go
Expand Down
3 changes: 3 additions & 0 deletions go/vt/worker/result_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package worker

import (
"context"

"vitess.io/vitess/go/sqltypes"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -39,4 +41,5 @@ type ResultReader interface {
// It returns the next result on the stream.
// It will return io.EOF if the stream ended.
Next() (*sqltypes.Result, error)
Close(ctx context.Context)
}
Loading