Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions go/vt/worker/restartable_result_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,30 @@ func NewRestartableResultReader(ctx context.Context, logger logutil.Logger, tp t
allowMultipleRetries: allowMultipleRetries,
}

// If the initial connection fails, we do not restart.
if _ /* retryable */, err := r.getTablet(); err != nil {
return nil, fmt.Errorf("tablet=unknown: %v", err)
}
if _ /* retryable */, err := r.startStream(); err != nil {
return nil, fmt.Errorf("tablet=%v: %v", topoproto.TabletAliasString(r.tablet.Alias), err)
// If the initial connection fails we retry once.
// Note: The first retry will be the second attempt.
attempt := 0
for {
attempt++
var err error
var retryable bool
if retryable, err = r.getTablet(); err != nil {
err = fmt.Errorf("tablet=unknown: %v", err)
goto retry
}
if retryable, err = r.startStream(); err != nil {
err = fmt.Errorf("tablet=%v: %v", topoproto.TabletAliasString(r.tablet.Alias), err)
goto retry
}
return r, nil

retry:
if !retryable || attempt > 1 {
return nil, fmt.Errorf("failed to initialize tablet connection: retryable %v, %v", retryable, err)
}
logger.Infof("retrying after error: %v", err)
statsRetryCount.Add(1)
}
return r, nil
}

// getTablet (re)sets the tablet which is used for the streaming query.
Expand Down Expand Up @@ -171,8 +187,7 @@ func (r *RestartableResultReader) nextWithRetries() (*sqltypes.Result, error) {
retryCtx, retryCancel := context.WithTimeout(r.ctx, *retryDuration)
defer retryCancel()

// Note: The first retry will be the second attempt.
attempt := 1
attempt := 0
start := time.Now()
for {
attempt++
Expand Down
25 changes: 4 additions & 21 deletions go/vt/worker/split_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,22 +438,13 @@ func (scw *SplitCloneWorker) run(ctx context.Context) error {
return err
}

// Phase 2a: Find destination master tablets.
// Phase 2: Find destination master tablets.
if err := scw.findDestinationMasters(ctx); err != nil {
return fmt.Errorf("findDestinationMasters() failed: %v", err)
}
if err := checkDone(ctx); err != nil {
return err
}
// Phase 2b: Wait for minimum number of destination tablets (required for the
// diff). Note that while we wait for the minimum number, we'll always use
// *all* available RDONLY tablets from each destination shard.
if err := scw.waitForTablets(ctx, scw.destinationShards, *waitForHealthyTabletsTimeout); err != nil {
return fmt.Errorf("waitForDestinationTablets(destinationShards) failed: %v", err)
}
if err := checkDone(ctx); err != nil {
return err
}

// Phase 3: (optional) online clone.
if scw.online {
Expand Down Expand Up @@ -778,7 +769,7 @@ func (scw *SplitCloneWorker) waitForTablets(ctx context.Context, shardInfos []*t
var wg sync.WaitGroup
rec := concurrency.AllErrorRecorder{}

if len(shardInfos) > 0 {
if scw.minHealthyRdonlyTablets > 0 && len(shardInfos) > 0 {
scw.wr.Logger().Infof("Waiting %v for %d %s/%s RDONLY tablet(s)", timeout, scw.minHealthyRdonlyTablets, shardInfos[0].Keyspace(), shardInfos[0].ShardName())
}

Expand Down Expand Up @@ -970,7 +961,7 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
// longer stopped at the same point as we took it offline initially.
allowMultipleRetries = false
} else {
tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName())
tp = newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_RDONLY)
}
sourceResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, allowMultipleRetries)
if err != nil {
Expand All @@ -981,16 +972,8 @@ func (scw *SplitCloneWorker) clone(ctx context.Context, state StatusWorkerState)
sourceReaders[shardIndex] = sourceResultReader
}

// Wait for enough healthy tablets (they might have become unhealthy
// and their replication lag might have increased due to a previous
// chunk pipeline.)
if err := scw.waitForTablets(ctx, scw.destinationShards, *retryDuration); err != nil {
processError("%v: No healthy destination tablets found (gave up after %v): ", errPrefix, time.Since(start), err)
return
}

for shardIndex, si := range scw.destinationShards {
tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName())
tp := newShardTabletProvider(scw.tsc, scw.tabletTracker, si.Keyspace(), si.ShardName(), topodatapb.TabletType_MASTER)
destResultReader, err := NewRestartableResultReader(ctx, scw.wr.Logger(), tp, td, chunk, true /* allowMultipleRetries */)
if err != nil {
processError("%v: NewRestartableResultReader for destination: %v failed: %v", errPrefix, tp.description(), err)
Expand Down
106 changes: 59 additions & 47 deletions go/vt/worker/split_clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
)

var (
errReadOnly = errors.New("The MariaDB server is running with the --read-only option so it cannot execute this statement (errno 1290) during query:")
errReadOnly = errors.New("the MariaDB server is running with the --read-only option so it cannot execute this statement (errno 1290) during query: ")

errStreamingQueryTimeout = errors.New("vttablet: generic::unknown: error: the query was killed either because it timed out or was canceled: (errno 2013) (sqlstate HY000) during query: ")
)
Expand All @@ -76,9 +76,9 @@ type splitCloneTestCase struct {

// Destination tablets.
leftMasterFakeDb *fakesqldb.DB
leftMasterQs *fakes.StreamHealthQueryService
leftMasterQs *testQueryService
rightMasterFakeDb *fakesqldb.DB
rightMasterQs *fakes.StreamHealthQueryService
rightMasterQs *testQueryService

// leftReplica is used by the reparent test.
leftReplica *testlib.FakeTablet
Expand All @@ -92,14 +92,23 @@ type splitCloneTestCase struct {

// defaultWorkerArgs are the full default arguments to run SplitClone.
defaultWorkerArgs []string

// Used to restore the default values after the test run
defaultExecuteFetchRetryTime time.Duration
defaultRetryDuration time.Duration
}

func (tc *splitCloneTestCase) setUp(v3 bool) {
tc.setUpWithConcurreny(v3, 10, 2, splitCloneTestRowsCount)
tc.setUpWithConcurrency(v3, 10, 2, splitCloneTestRowsCount)
}

func (tc *splitCloneTestCase) setUpWithConcurreny(v3 bool, concurrency, writeQueryMaxRows, rowsCount int) {
func (tc *splitCloneTestCase) setUpWithConcurrency(v3 bool, concurrency, writeQueryMaxRows, rowsCount int) {
*useV3ReshardingMode = v3

// Reset some retry flags for the tests that change that
tc.defaultRetryDuration = *retryDuration
tc.defaultExecuteFetchRetryTime = *executeFetchRetryTime

tc.ts = memorytopo.NewServer("cell1", "cell2")
ctx := context.Background()
tc.wi = NewInstance(tc.ts, "cell1", time.Second)
Expand Down Expand Up @@ -247,12 +256,13 @@ func (tc *splitCloneTestCase) setUpWithConcurreny(v3 bool, concurrency, writeQue
expectBlpCheckpointCreationQueries(tc.rightMasterFakeDb)

// Fake stream health reponses because vtworker needs them to find the master.
tc.leftMasterQs = fakes.NewStreamHealthQueryService(leftMaster.Target())
tc.leftMasterQs.AddDefaultHealthResponse()
shqs := fakes.NewStreamHealthQueryService(leftMaster.Target())
shqs.AddDefaultHealthResponse()
tc.leftMasterQs = newTestQueryService(tc.t, leftMaster.Target(), shqs, 0, 2, topoproto.TabletAliasString(leftMaster.Tablet.Alias), false /* omitKeyspaceID */)
tc.leftReplicaQs = fakes.NewStreamHealthQueryService(leftReplica.Target())
tc.leftReplicaQs.AddDefaultHealthResponse()
tc.rightMasterQs = fakes.NewStreamHealthQueryService(rightMaster.Target())
tc.rightMasterQs.AddDefaultHealthResponse()
shqs = fakes.NewStreamHealthQueryService(rightMaster.Target())
shqs.AddDefaultHealthResponse()
tc.rightMasterQs = newTestQueryService(tc.t, rightMaster.Target(), shqs, 1, 2, topoproto.TabletAliasString(rightMaster.Tablet.Alias), false /* omitKeyspaceID */)
grpcqueryservice.Register(leftMaster.RPCServer, tc.leftMasterQs)
grpcqueryservice.Register(leftReplica.RPCServer, tc.leftReplicaQs)
grpcqueryservice.Register(rightMaster.RPCServer, tc.rightMasterQs)
Expand All @@ -278,6 +288,9 @@ func (tc *splitCloneTestCase) setUpWithConcurreny(v3 bool, concurrency, writeQue
}

func (tc *splitCloneTestCase) tearDown() {
*retryDuration = tc.defaultRetryDuration
*executeFetchRetryTime = tc.defaultExecuteFetchRetryTime

for _, ft := range tc.tablets {
ft.StopActionLoop(tc.t)
}
Expand Down Expand Up @@ -521,7 +534,7 @@ func TestSplitCloneV2_Offline(t *testing.T) {
// get processed concurrently while the other pending ones are blocked.
func TestSplitCloneV2_Offline_HighChunkCount(t *testing.T) {
tc := &splitCloneTestCase{t: t}
tc.setUpWithConcurreny(false /* v3 */, 10, 5 /* writeQueryMaxRows */, 1000 /* rowsCount */)
tc.setUpWithConcurrency(false /* v3 */, 10, 5 /* writeQueryMaxRows */, 1000 /* rowsCount */)
defer tc.tearDown()

args := make([]string, len(tc.defaultWorkerArgs))
Expand All @@ -547,6 +560,9 @@ func TestSplitCloneV2_Offline_RestartStreamingQuery(t *testing.T) {
tc.setUp(false /* v3 */)
defer tc.tearDown()

// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond

// Ensure that this test uses only the first tablet. This makes it easier
// to verify that the restart actually happened for that tablet.
// SplitClone will ignore the second tablet because we set its replication lag
Expand All @@ -569,7 +585,7 @@ func TestSplitCloneV2_Offline_RestartStreamingQuery(t *testing.T) {
}

alias := tc.sourceRdonlyQs[0].alias
if got, want := statsStreamingQueryErrorsCounters.Counts()[alias], int64(1); got != want {
if got, want := statsStreamingQueryErrorsCounters.Counts()[alias], int64(2); got != want {
t.Errorf("wrong number of errored streaming query for tablet: %v: got = %v, want = %v", alias, got, want)
}
if got, want := statsStreamingQueryCounters.Counts()[alias], int64(11); got != want {
Expand All @@ -582,9 +598,12 @@ func TestSplitCloneV2_Offline_RestartStreamingQuery(t *testing.T) {
// of the streaming query does not succeed here and instead vtworker will fail.
func TestSplitCloneV2_Offline_FailOverStreamingQuery_NotAllowed(t *testing.T) {
tc := &splitCloneTestCase{t: t}
tc.setUpWithConcurreny(false /* v3 */, 1, 10, splitCloneTestRowsCount)
tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount)
defer tc.tearDown()

// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond

// Ensure that this test uses only the first tablet.
tc.sourceRdonlyQs[1].AddHealthResponseWithSecondsBehindMaster(3600)

Expand All @@ -605,7 +624,7 @@ func TestSplitCloneV2_Offline_FailOverStreamingQuery_NotAllowed(t *testing.T) {
}

alias := tc.sourceRdonlyQs[0].alias
if got, want := statsStreamingQueryErrorsCounters.Counts()[alias], int64(1); got != want {
if got, want := statsStreamingQueryErrorsCounters.Counts()[alias], int64(2); got != want {
t.Errorf("wrong number of errored streaming query for tablet: %v: got = %v, want = %v", alias, got, want)
}
if got, want := statsStreamingQueryCounters.Counts()[alias], int64(1); got != want {
Expand All @@ -619,7 +638,7 @@ func TestSplitCloneV2_Offline_FailOverStreamingQuery_NotAllowed(t *testing.T) {
// reading the last row.
func TestSplitCloneV2_Online_FailOverStreamingQuery(t *testing.T) {
tc := &splitCloneTestCase{t: t}
tc.setUpWithConcurreny(false /* v3 */, 1, 10, splitCloneTestRowsCount)
tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount)
defer tc.tearDown()

// In the online phase we won't enable filtered replication. Don't expect it.
Expand Down Expand Up @@ -674,7 +693,7 @@ func TestSplitCloneV2_Online_FailOverStreamingQuery(t *testing.T) {
// available.
func TestSplitCloneV2_Online_TabletsUnavailableDuringRestart(t *testing.T) {
tc := &splitCloneTestCase{t: t}
tc.setUpWithConcurreny(false /* v3 */, 1, 10, splitCloneTestRowsCount)
tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount)
defer tc.tearDown()

// In the online phase we won't enable filtered replication. Don't expect it.
Expand All @@ -694,15 +713,9 @@ func TestSplitCloneV2_Online_TabletsUnavailableDuringRestart(t *testing.T) {
tc.sourceRdonlyQs[0].AddHealthResponseWithNotServing()
})

// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond
// Let vtworker keep retrying and give up rather quickly because the test
// will be blocked until it finally fails.
defaultRetryDuration := *retryDuration
*retryDuration = 500 * time.Millisecond
defer func() {
*retryDuration = defaultRetryDuration
}()

// Run the vtworker command.
args := []string{"SplitClone",
Expand Down Expand Up @@ -759,14 +772,10 @@ func TestSplitCloneV2_Online_Offline(t *testing.T) {
// When the online clone inserted the last rows, modify the destination test
// query service such that it will return them as well.
tc.leftMasterFakeDb.GetEntry(29).AfterFunc = func() {
for i := range []int{0, 1} {
tc.leftRdonlyQs[i].addGeneratedRows(100, 200)
}
tc.leftMasterQs.addGeneratedRows(100, 200)
}
tc.rightMasterFakeDb.GetEntry(29).AfterFunc = func() {
for i := range []int{0, 1} {
tc.rightRdonlyQs[i].addGeneratedRows(100, 200)
}
tc.rightMasterQs.addGeneratedRows(100, 200)
}

// Run the vtworker command.
Expand All @@ -791,7 +800,7 @@ func TestSplitCloneV2_Offline_Reconciliation(t *testing.T) {
tc := &splitCloneTestCase{t: t}
// We reduce the parallelism to 1 to test the order of expected
// insert/update/delete statements on the destination master.
tc.setUpWithConcurreny(false /* v3 */, 1, 10, splitCloneTestRowsCount)
tc.setUpWithConcurrency(false /* v3 */, 1, 10, splitCloneTestRowsCount)
defer tc.tearDown()

// We assume that an Online Clone ran before which copied the rows 100-199
Expand All @@ -809,15 +818,13 @@ func TestSplitCloneV2_Offline_Reconciliation(t *testing.T) {
qs.addGeneratedRows(100, 190)
}

for i := range []int{0, 1} {
// The destination has rows 100-190 with the source in common.
// Rows 191-200 are extraenous on the destination.
tc.leftRdonlyQs[i].addGeneratedRows(100, 200)
tc.rightRdonlyQs[i].addGeneratedRows(100, 200)
// But some data is outdated data and must be updated.
tc.leftRdonlyQs[i].modifyFirstRows(2)
tc.rightRdonlyQs[i].modifyFirstRows(2)
}
// The destination has rows 100-190 with the source in common.
// Rows 191-200 are extraneous on the destination.
tc.leftMasterQs.addGeneratedRows(100, 200)
tc.rightMasterQs.addGeneratedRows(100, 200)
// But some data is outdated data and must be updated.
tc.leftMasterQs.modifyFirstRows(2)
tc.rightMasterQs.modifyFirstRows(2)

// The destination tablets should see inserts, updates and deletes.
// Clear the entries added by setUp() because the reconcilation will
Expand Down Expand Up @@ -897,11 +904,12 @@ func TestSplitCloneV2_RetryDueToReadonly(t *testing.T) {
tc.setUp(false /* v3 */)
defer tc.tearDown()

// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond

// Provoke a retry to test the error handling.
tc.leftMasterFakeDb.AddExpectedQueryAtIndex(0, "INSERT INTO `vt_ks`.`table1` (`id`, `msg`, `keyspace_id`) VALUES (*", errReadOnly)
tc.rightMasterFakeDb.AddExpectedQueryAtIndex(0, "INSERT INTO `vt_ks`.`table1` (`id`, `msg`, `keyspace_id`) VALUES (*", errReadOnly)
// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond

// Run the vtworker command.
if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil {
Expand All @@ -926,6 +934,9 @@ func TestSplitCloneV2_RetryDueToReparent(t *testing.T) {
tc.setUp(false /* v3 */)
defer tc.tearDown()

// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond

// Provoke a reparent just before the copy finishes.
// leftReplica will take over for the last, 30th, insert and the BLP checkpoint.
tc.leftReplicaFakeDb.AddExpectedQuery("INSERT INTO `vt_ks`.`table1` (`id`, `msg`, `keyspace_id`) VALUES (*", nil)
Expand Down Expand Up @@ -965,9 +976,6 @@ func TestSplitCloneV2_RetryDueToReparent(t *testing.T) {
// => vtworker has no MASTER to go to and will keep retrying.
}

// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond

// Run the vtworker command.
if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil {
t.Fatal(err)
Expand All @@ -987,12 +995,16 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) {
tc.setUp(false /* v3 */)
defer tc.tearDown()

// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond

// leftReplica will take over for the last, 30th, insert and the BLP checkpoint.
tc.leftReplicaFakeDb.AddExpectedQuery("INSERT INTO `vt_ks`.`table1` (`id`, `msg`, `keyspace_id`) VALUES (*", nil)
expectBlpCheckpointCreationQueries(tc.leftReplicaFakeDb)

// During the 29th write, let the MASTER disappear.
tc.leftMasterFakeDb.GetEntry(28).AfterFunc = func() {
t.Logf("setting MASTER tablet to REPLICA")
tc.leftMasterQs.UpdateType(topodatapb.TabletType_REPLICA)
tc.leftMasterQs.AddDefaultHealthResponse()
}
Expand All @@ -1019,7 +1031,9 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) {
defer cancel()

for {
if statsRetryCounters.Counts()[retryCategoryNoMasterAvailable] >= 1 {
retries := statsRetryCounters.Counts()[retryCategoryNoMasterAvailable]
if retries >= 1 {
t.Logf("retried on no MASTER %v times", retries)
break
}

Expand All @@ -1032,13 +1046,11 @@ func TestSplitCloneV2_NoMasterAvailable(t *testing.T) {
}

// Make leftReplica the new MASTER.
t.Logf("resetting tablet back to MASTER")
tc.leftReplicaQs.UpdateType(topodatapb.TabletType_MASTER)
tc.leftReplicaQs.AddDefaultHealthResponse()
}()

// Only wait 1 ms between retries, so that the test passes faster.
*executeFetchRetryTime = 1 * time.Millisecond

// Run the vtworker command.
if err := runCommand(t, tc.wi, tc.wi.wr, tc.defaultWorkerArgs); err != nil {
t.Fatal(err)
Expand Down
Loading