Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/mycnf/default-fast.cnf
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ key_buffer_size = 2M
log-error = {{.ErrorLogPath}}
long_query_time = 2
max_allowed_packet = 16M
max_connections = 100
max_connections = 200
net_write_timeout = 60
pid-file = {{.PidFile}}
port = {{.MysqlPort}}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/worker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (e *executor) checkError(ctx context.Context, err error, isRetry bool, mast
e.wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL read-only error: %v", tabletString, err)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryReadOnly, 1)
case errNo == "2002" || errNo == "2006" || errNo == "2013":
case errNo == "2002" || errNo == "2006" || errNo == "2013" || errNo == "1053":
// Note:
// "2006" happens if the connection is already dead. Retrying a query in
// this case is safe.
Expand All @@ -229,6 +229,7 @@ func (e *executor) checkError(ctx context.Context, err error, isRetry bool, mast
// it was aborted. If we retry the query and get a duplicate entry error, we
// assume that the previous execution was successful and ignore the error.
// See below for the handling of duplicate entry error "1062".
// "1053" is mysql shutting down
e.wr.Logger().Warningf("ExecuteFetch failed on %v; will reresolve and retry because it's due to a MySQL connection error: %v", tabletString, err)
statsRetryCount.Add(1)
statsRetryCounters.Add(retryCategoryConnectionError, 1)
Expand Down
32 changes: 24 additions & 8 deletions go/vt/worker/restartable_result_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,30 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t
allowMultipleRetries: allowMultipleRetries,
}

// If the initial connection fails, we do not restart.
if _ /* retryable */, err := r.getTablet(); err != nil {
return nil, vterrors.Wrap(err, "tablet=unknown")
}
if _ /* retryable */, err := r.startStream(); err != nil {
return nil, vterrors.Wrapf(err, "tablet=%v", topoproto.TabletAliasString(r.tablet.Alias))
// If the initial connection fails we retry once.
// Note: The first retry will be the second attempt.
attempt := 0
for {
attempt++
var err error
var retryable bool
if retryable, err = r.getTablet(); err != nil {
err = fmt.Errorf("tablet=unknown: %v", err)
goto retry
}
if retryable, err = r.startStream(); err != nil {
err = fmt.Errorf("tablet=%v: %v", topoproto.TabletAliasString(r.tablet.Alias), err)
goto retry
}
return r, nil

retry:
if !retryable || attempt > 1 {
return nil, fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err)
}
statsRetryCount.Add(1)
logger.Infof("retrying after error: %v", err)
}
return r, nil
}

// getTablet (re)sets the tablet which is used for the streaming query.
Expand Down Expand Up @@ -172,7 +188,7 @@ func (r *RestartableResultReader) nextWithRetries() (*sqltypes.Result, error) {
retryCtx, retryCancel := context.WithTimeout(r.ctx, *retryDuration)
defer retryCancel()

// Note: The first retry will be the second attempt.
// The first retry is the second attempt because we already tried once in Next()
attempt := 1
start := time.Now()
for {
Expand Down
27 changes: 5 additions & 22 deletions go/vt/worker/split_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,29 +433,20 @@ func (scw *SplitCloneWorker) run(ctx context.Context) error {
return err
}

// Phase 2a: Find destination master tablets.
// Phase 2: Find destination master tablets.
if err := scw.findDestinationMasters(ctx); err != nil {
return vterrors.Wrap(err, "findDestinationMasters() failed")
}
if err := checkDone(ctx); err != nil {
return err
}
// Phase 2b: Wait for minimum number of destination tablets (required for the
// diff). Note that while we wait for the minimum number, we'll always use
// *all* available RDONLY tablets from each destination shard.
if err := scw.waitForTablets(ctx, scw.destinationShards, *waitForHealthyTabletsTimeout); err != nil {
return vterrors.Wrap(err, "waitForDestinationTablets(destinationShards) failed")
}
if err := checkDone(ctx); err != nil {
return err
}

// Phase 3: (optional) online clone.
if scw.online {
scw.wr.Logger().Infof("Online clone will be run now.")
// 3a: Wait for minimum number of source tablets (required for the diff).
if err := scw.waitForTablets(ctx, scw.sourceShards, *waitForHealthyTabletsTimeout); err != nil {
return vterrors.Wrap(err, "waitForDestinationTablets(sourceShards) failed")
return vterrors.Wrap(err, "waitForTablets(sourceShards) failed")
}
// 3b: Clone the data.
start := time.Now()
Expand Down Expand Up @@ -784,7 +775,7 @@ func (scw *SplitCloneWorker) waitForTablets(ctx context.Context, shardInfos []*t
var wg sync.WaitGroup
rec := concurrency.AllErrorRecorder{}

if len(shardInfos) > 0 {
if scw.minHealthyRdonlyTablets > 0 && len(shardInfos) > 0 {
scw.wr.Logger().Infof("Waiting %v for %d %s/%s RDONLY tablet(s)", timeout, scw.minHealthyRdonlyTablets, shardInfos[0].Keyspace(), shardInfos[0].ShardName())
}

Expand Down Expand Up @@ -976,7 +967,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
// longer stopped at the same point as we took it offline initially.
allowMultipleRetries = false
} else {
tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName())
tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY)
}
sourceResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries)
if err != nil {
Expand All @@ -987,16 +978,8 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
sourceReaders[shardIndex] = sourceResultReader
}

// Wait for enough healthy tablets (they might have become unhealthy
// and their replication lag might have increased due to a previous
// chunk pipeline.)
if err := scw.waitForTablets(ctx, scw.destinationShards, *retryDuration); err != nil {
processError("%v: No healthy destination tablets found (gave up after %v): ", errPrefix, time.Since(start), err)
return
}

for shardIndex, si := range scw.destinationShards {
tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName())
tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)
destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */)
if err != nil {
processError("%v: NewRestartableResultReader for destination: %v failed: %v", errPrefix, tp.description(), err)
Expand Down
Loading