diff --git a/go/mysql/sqlerror/constants.go b/go/mysql/sqlerror/constants.go index 7f1f140dd94..2f4e3e045c9 100644 --- a/go/mysql/sqlerror/constants.go +++ b/go/mysql/sqlerror/constants.go @@ -84,11 +84,12 @@ const ( ERKeyDoesNotExist = ErrorCode(1176) // permissions - ERDBAccessDenied = ErrorCode(1044) - ERAccessDeniedError = ErrorCode(1045) - ERKillDenied = ErrorCode(1095) - ERNoPermissionToCreateUsers = ErrorCode(1211) - ERSpecifiedAccessDenied = ErrorCode(1227) + ERDBAccessDenied = ErrorCode(1044) + ERAccessDeniedError = ErrorCode(1045) + ERKillDenied = ErrorCode(1095) + ERNoPermissionToCreateUsers = ErrorCode(1211) + ERSpecifiedAccessDenied = ErrorCode(1227) + ERBinlogCreateRoutineNeedSuper = ErrorCode(1419) // failed precondition ERNoDb = ErrorCode(1046) diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index a349a94ffa1..5566dcbf6bc 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" + vttablet "vitess.io/vitess/go/vt/vttablet/common" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -101,6 +102,49 @@ func TestFKWorkflow(t *testing.T) { waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) targetKs := vc.Cells[cellName].Keyspaces[targetKeyspace] targetTab := targetKs.Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, targetTabletId)].Vttablet + + // Stop the LoadSimulator while we are testing for workflow error, so that + // we don't error out in the LoadSimulator as we will be shutting down source dbServer. + if withLoad { + cancel() + <-ch + } + + sourceTab := vc.Cells[cellName].Keyspaces[sourceKeyspace].Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, 100)] + + // Stop the source database server to simulate an error during replication phase + // This should cause recoverable errors that atomic workflows should retry + // as it is already out of copy phase. + err := sourceTab.DbServer.Stop() + require.NoError(t, err) + + // Give some time for the workflow to encounter errors and potentially retry + time.Sleep(2 * vttablet.GetDefaultVReplicationConfig().RetryDelay) + + // Verify workflow is still running and hasn't terminated due to errors + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + + // Restart the source database to allow workflow to continue + err = sourceTab.DbServer.StartProvideInit(false) + require.NoError(t, err) + + err = vc.VtctldClient.ExecuteCommand("SetWritable", fmt.Sprintf("%s-%d", cellName, 100), "true") + require.NoError(t, err) + + // Restart the LoadSimulator. + if withLoad { + ctx, cancel = context.WithCancel(context.Background()) + ls = newFKLoadSimulator(t, ctx) + defer func() { + select { + case <-ctx.Done(): + default: + cancel() + } + }() + go ls.simulateLoad() + } + require.NotNil(t, targetTab) catchup(t, targetTab, workflowName, "MoveTables") vdiff(t, targetKeyspace, workflowName, cellName, nil) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 3741c5dcf63..991925d9a6e 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -319,9 +319,19 @@ func (vsm *vstreamManager) GetTotalStreamDelay() int64 { func (vs *vstream) stream(ctx context.Context) error { ctx, vs.cancel = context.WithCancel(ctx) - defer vs.cancel() - go vs.sendEvents(ctx) + vs.wg.Add(1) + go func() { + defer vs.wg.Done() + + // sendEvents returns either if the given context has been canceled or if + // an error is returned from the callback. If the callback returns an error, + // we need to cancel the context to stop the other stream goroutines + // and to unblock the VStream call. + defer vs.cancel() + + vs.sendEvents(ctx) + }() // Make a copy first, because the ShardGtids list can change once streaming starts. copylist := append(([]*binlogdatapb.ShardGtid)(nil), vs.vgtid.ShardGtids...) @@ -358,6 +368,7 @@ func (vs *vstream) sendEvents(ctx context.Context) { } return nil } + for { select { case <-ctx.Done(): diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 1e59c1f94ff..58a7dfb5218 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -20,11 +20,9 @@ import ( "context" "fmt" "os" - "reflect" "runtime/pprof" "strings" "sync" - "sync/atomic" "testing" "time" @@ -113,16 +111,25 @@ func TestVStreamSkew(t *testing.T) { vgtid.ShardGtids = append(vgtid.ShardGtids, &binlogdatapb.ShardGtid{Keyspace: ks, Gtid: "pos", Shard: "20-40"}) go stream(sbc1, ks, "20-40", tcase.numEventsPerShard, tcase.shard1idx) } - ch := startVStream(ctx, t, vsm, vgtid, &vtgatepb.VStreamFlags{MinimizeSkew: true}) - var receivedEvents []*binlogdatapb.VEvent - for len(receivedEvents) < int(want) { - select { - case <-time.After(1 * time.Minute): - require.FailNow(t, "test timed out") - case response := <-ch: - receivedEvents = append(receivedEvents, response.Events...) + + vstreamCtx, vstreamCancel := context.WithTimeout(ctx, 1*time.Minute) + defer vstreamCancel() + + receivedEvents := make([]*binlogdatapb.VEvent, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{MinimizeSkew: true}, func(events []*binlogdatapb.VEvent) error { + receivedEvents = append(receivedEvents, events...) + + if int64(len(receivedEvents)) == want { + // Stop streaming after receiving both expected responses. + vstreamCancel() } - } + + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + require.Equal(t, int(want), int(len(receivedEvents))) require.Equal(t, tcase.expectedDelays, vsm.GetTotalStreamDelay()-previousDelays) previousDelays = vsm.GetTotalStreamDelay() @@ -262,23 +269,26 @@ func TestVStreamEvents(t *testing.T) { Gtid: "pos", }}, } - ch := make(chan *binlogdatapb.VStreamResponse) - go func() { - err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { - ch <- &binlogdatapb.VStreamResponse{Events: events} - return nil - }) - wantErr := "context canceled" - if err == nil || !strings.Contains(err.Error(), wantErr) { - t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) + + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + receivedEvents := make([]*binlogdatapb.VStreamResponse, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + receivedEvents = append(receivedEvents, &binlogdatapb.VStreamResponse{Events: events}) + + if len(receivedEvents) == 2 { + // Stop streaming after receiving both expected responses. + vstreamCancel() } - ch <- nil - }() - verifyEvents(t, ch, want1, want2) - // Ensure the go func error return was verified. - cancel() - <-ch + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + + require.ElementsMatch(t, []*binlogdatapb.VStreamResponse{want1, want2}, receivedEvents) } func BenchmarkVStreamEvents(b *testing.B) { @@ -415,7 +425,6 @@ func TestVStreamChunks(t *testing.T) { rowEncountered := false doneCounting := false - var rowCount, ddlCount atomic.Int32 vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ Keyspace: ks, @@ -427,7 +436,12 @@ func TestVStreamChunks(t *testing.T) { Gtid: "pos", }}, } - _ = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + var rowCount, ddlCount int + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { switch events[0].Type { case binlogdatapb.VEventType_ROW: if doneCounting { @@ -435,30 +449,39 @@ func TestVStreamChunks(t *testing.T) { return fmt.Errorf("unexpected event: %v", events[0]) } rowEncountered = true - rowCount.Add(1) + rowCount += 1 + case binlogdatapb.VEventType_COMMIT: if !rowEncountered { t.Errorf("Unexpected event, COMMIT after non-rows: %v", events[0]) return fmt.Errorf("unexpected event: %v", events[0]) } doneCounting = true + case binlogdatapb.VEventType_DDL: if !doneCounting && rowEncountered { t.Errorf("Unexpected event, DDL during ROW events: %v", events[0]) return fmt.Errorf("unexpected event: %v", events[0]) } - ddlCount.Add(1) + ddlCount += 1 + default: t.Errorf("Unexpected event: %v", events[0]) return fmt.Errorf("unexpected event: %v", events[0]) } - if rowCount.Load() == int32(100) && ddlCount.Load() == int32(100) { - cancel() + + if rowCount == 100 && ddlCount == 100 { + vstreamCancel() } + return nil }) - assert.Equal(t, int32(100), rowCount.Load()) - assert.Equal(t, int32(100), ddlCount.Load()) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + + require.Equal(t, 100, rowCount) + require.Equal(t, 100, ddlCount) } func TestVStreamMulti(t *testing.T) { @@ -498,15 +521,34 @@ func TestVStreamMulti(t *testing.T) { Gtid: "pos", }}, } - ch := startVStream(ctx, t, vsm, vgtid, nil) - <-ch - response := <-ch + + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + receivedEvents := make([]*binlogdatapb.VEvent, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + receivedEvents = append(receivedEvents, events...) + + if len(receivedEvents) == 4 { + // Stop streaming after receiving both expected responses. + vstreamCancel() + } + + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + + require.Equal(t, 4, len(receivedEvents)) + var got *binlogdatapb.VGtid - for _, ev := range response.Events { + for _, ev := range receivedEvents { if ev.Type == binlogdatapb.VEventType_VGTID { got = ev.Vgtid } } + want := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ Keyspace: ks, @@ -518,9 +560,8 @@ func TestVStreamMulti(t *testing.T) { Gtid: "gtid02", }}, } - if !proto.Equal(got, want) { - t.Errorf("VGtid:\n%v, want\n%v", got, want) - } + + require.ElementsMatch(t, got.ShardGtids, want.ShardGtids) } func TestVStreamsMetrics(t *testing.T) { @@ -566,52 +607,61 @@ func TestVStreamsMetrics(t *testing.T) { Gtid: "pos", }}, } - ch := startVStream(ctx, t, vsm, vgtid, nil) - <-ch - <-ch + expectedLabels1 := "TestVStream.-20.PRIMARY" expectedLabels2 := "TestVStream.20-40.PRIMARY" - wantVStreamsCreated := map[string]int64{ + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + receivedResponses := make([]*binlogdatapb.VStreamResponse, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + receivedResponses = append(receivedResponses, &binlogdatapb.VStreamResponse{Events: events}) + + // While the VStream is running, we should see one active stream per shard. + require.Equal(t, map[string]int64{ + expectedLabels1: 1, + expectedLabels2: 1, + }, vsm.vstreamsCount.Counts()) + + if len(receivedResponses) == 2 { + // Stop streaming after receiving both expected responses. + vstreamCancel() + } + + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + + require.Equal(t, 2, len(receivedResponses)) + + // After the streams end, the count should go back to zero. + require.Equal(t, map[string]int64{ + expectedLabels1: 0, + expectedLabels2: 0, + }, vsm.vstreamsCount.Counts()) + + require.Equal(t, map[string]int64{ expectedLabels1: 1, expectedLabels2: 1, - } - waitForMetricsMatch(t, vsm.vstreamsCreated.Counts, wantVStreamsCreated) + }, vsm.vstreamsCreated.Counts()) - wantVStreamsLag := map[string]int64{ + require.Equal(t, map[string]int64{ expectedLabels1: 5, expectedLabels2: 7, - } - waitForMetricsMatch(t, vsm.vstreamsLag.Counts, wantVStreamsLag) + }, vsm.vstreamsLag.Counts()) - wantVStreamsCount := map[string]int64{ - expectedLabels1: 1, - expectedLabels2: 1, - } - waitForMetricsMatch(t, vsm.vstreamsCount.Counts, wantVStreamsCount) - - wantVEventsCount := map[string]int64{ + require.Equal(t, map[string]int64{ expectedLabels1: 2, expectedLabels2: 2, - } - waitForMetricsMatch(t, vsm.vstreamsEventsStreamed.Counts, wantVEventsCount) + }, vsm.vstreamsEventsStreamed.Counts()) - wantVStreamsEndedWithErrors := map[string]int64{ + require.Equal(t, map[string]int64{ expectedLabels1: 0, expectedLabels2: 0, - } - waitForMetricsMatch(t, vsm.vstreamsEndedWithErrors.Counts, wantVStreamsEndedWithErrors) -} - -func waitForMetricsMatch(t *testing.T, getActual func() map[string]int64, want map[string]int64) { - deadline := time.Now().Add(1 * time.Second) - for time.Now().Before(deadline) { - if reflect.DeepEqual(getActual(), want) { - return - } - time.Sleep(10 * time.Millisecond) - } - assert.Equal(t, want, getActual(), "metrics did not match within timeout") + }, vsm.vstreamsEndedWithErrors.Counts()) } func TestVStreamsMetricsErrors(t *testing.T) { @@ -654,18 +704,21 @@ func TestVStreamsMetricsErrors(t *testing.T) { Gtid: "pos", }}, } - ch := make(chan *binlogdatapb.VStreamResponse) - done := make(chan struct{}) - var err error - go func() { - err = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { - ch <- &binlogdatapb.VStreamResponse{Events: events} - return nil - }) - close(done) - }() - <-ch - <-done + + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + results := make([]*binlogdatapb.VStreamResponse, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + results = append(results, &binlogdatapb.VStreamResponse{Events: events}) + + if len(results) == 2 { + // We should never actually see 2 responses come in + vstreamCancel() + } + + return nil + }) if err == nil || !strings.Contains(err.Error(), wantErr) { require.ErrorContains(t, err, wantErr) @@ -680,6 +733,61 @@ func TestVStreamsMetricsErrors(t *testing.T) { assert.Equal(t, wantVStreamsEndedWithErrors, vsm.vstreamsEndedWithErrors.Counts(), "vstreamsEndedWithErrors matches") } +func TestVStreamErrorInCallback(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Use a unique cell to avoid parallel tests interfering with each other's metrics + cell := "ac" + ks := "TestVStream" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm.vstreamsCreated.ResetAll() + vsm.vstreamsLag.ResetAll() + vsm.vstreamsCount.ResetAll() + vsm.vstreamsEventsStreamed.ResetAll() + vsm.vstreamsEndedWithErrors.ResetAll() + sbc0 := hc.AddTestTablet(cell, "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet(cell, "1.1.1.2", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) + + send1 := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid01"}, + {Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 15 * 1e9}, + } + sbc0.AddVStreamEvents(send1, nil) + + send2 := []*binlogdatapb.VEvent{ + {Type: binlogdatapb.VEventType_GTID, Gtid: "gtid02"}, + {Type: binlogdatapb.VEventType_COMMIT, Timestamp: 10, CurrentTime: 17 * 1e9}, + } + sbc1.AddVStreamEvents(send2, nil) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }, { + Keyspace: ks, + Shard: "20-40", + Gtid: "pos", + }}, + } + + expectedError := fmt.Errorf("callback error") + + err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + return expectedError + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), expectedError) +} + func TestVStreamRetriableErrors(t *testing.T) { type testCase struct { name string @@ -745,8 +853,6 @@ func TestVStreamRetriableErrors(t *testing.T) { {Type: binlogdatapb.VEventType_COMMIT}, } - want := &binlogdatapb.VStreamResponse{Events: commit} - for _, tcase := range tcases { t.Run(tcase.name, func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -787,47 +893,30 @@ func TestVStreamRetriableErrors(t *testing.T) { }}, } - ch := make(chan *binlogdatapb.VStreamResponse) - done := make(chan struct{}) - go func() { - err := vsm.VStream(ctx, topodatapb.TabletType_REPLICA, vgtid, nil, &vtgatepb.VStreamFlags{Cells: strings.Join(cells, ",")}, func(events []*binlogdatapb.VEvent) error { - ch <- &binlogdatapb.VStreamResponse{Events: events} - return nil - }) - wantErr := "context canceled" + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() - if !tcase.shouldRetry { - wantErr = tcase.msg - } + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_REPLICA, vgtid, nil, &vtgatepb.VStreamFlags{Cells: strings.Join(cells, ",")}, func(events []*binlogdatapb.VEvent) error { + defer vstreamCancel() - if err == nil || !strings.Contains(err.Error(), wantErr) { - t.Errorf("vstream end: %v, must contain %v", err.Error(), wantErr) - } - close(done) - }() + require.Equal(t, 1, len(events)) + require.Equal(t, commit, events) - Loop: - for { - if tcase.shouldRetry { - select { - case event := <-ch: - got := event.CloneVT() - if !proto.Equal(got, want) { - t.Errorf("got different vstream event than expected") - } - cancel() - case <-done: - // The goroutine has completed, so break out of the loop - break Loop - } - } else { - <-done - break Loop - } + return nil + }) + + if tcase.shouldRetry { + // Expect a cancel error because the stream was retried and our callback + // was called. + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + } else { + // Expect the original error because no retry was done. + require.Error(t, err) + require.ErrorContains(t, err, tcase.msg) } }) } - } func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { @@ -876,8 +965,26 @@ func TestVStreamShouldNotSendSourceHeartbeats(t *testing.T) { Gtid: "pos", }}, } - ch := startVStream(ctx, t, vsm, vgtid, nil) - verifyEvents(t, ch, want) + + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + receivedResponses := make([]*binlogdatapb.VStreamResponse, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + receivedResponses = append(receivedResponses, &binlogdatapb.VStreamResponse{Events: events}) + + if len(receivedResponses) == 1 { + // Stop streaming after receiving the expected response. + vstreamCancel() + } + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + + require.Equal(t, 1, len(receivedResponses)) + require.EqualExportedValues(t, want, receivedResponses[0]) } func TestVStreamJournalOneToMany(t *testing.T) { @@ -962,14 +1069,35 @@ func TestVStreamJournalOneToMany(t *testing.T) { Gtid: "pos", }}, } - ch := startVStream(ctx, t, vsm, vgtid, nil) - verifyEvents(t, ch, want1) - // The following two events from the different shards can come in any order. - // But the resulting VGTID should be the same after both are received. - <-ch - got := <-ch - wantevent := &binlogdatapb.VEvent{ + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + receivedEvents := make([]*binlogdatapb.VStreamResponse, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + receivedEvents = append(receivedEvents, &binlogdatapb.VStreamResponse{Events: events}) + + if len(receivedEvents) == 3 { + // Stop streaming after receiving all expected responses. + vstreamCancel() + } + + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + + require.Equal(t, 3, len(receivedEvents)) + + // First event should be the first transaction from the first shard. + require.EqualExportedValues(t, want1, receivedEvents[0]) + + // The second and third events can come in any order. + // So instead of comparing them directly, we simply verify that the GTID + // after the last event is the expected combined GTID. + + require.EqualExportedValues(t, &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ @@ -982,13 +1110,7 @@ func TestVStreamJournalOneToMany(t *testing.T) { Gtid: "gtid04", }}, }, - } - gotEvent := got.Events[0] - gotEvent.Keyspace = "" - gotEvent.Shard = "" - if !proto.Equal(gotEvent, wantevent) { - t.Errorf("vgtid: %v, want %v", got.Events[0], wantevent) - } + }, receivedEvents[2].Events[0]) } func TestVStreamJournalManyToOne(t *testing.T) { @@ -1081,12 +1203,28 @@ func TestVStreamJournalManyToOne(t *testing.T) { Gtid: "pos1020", }}, } - ch := startVStream(ctx, t, vsm, vgtid, nil) - // The following two events from the different shards can come in any order. - // But the resulting VGTID should be the same after both are received. - <-ch - got := <-ch - wantevent := &binlogdatapb.VEvent{ + + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + receivedResponses := make([]*binlogdatapb.VStreamResponse, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + receivedResponses = append(receivedResponses, &binlogdatapb.VStreamResponse{Events: events}) + + if len(receivedResponses) == 3 { + // Stop streaming after receiving all expected responses. + vstreamCancel() + } + + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + + require.Equal(t, 3, len(receivedResponses)) + + require.EqualExportedValues(t, &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_VGTID, Vgtid: &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ @@ -1099,14 +1237,9 @@ func TestVStreamJournalManyToOne(t *testing.T) { Gtid: "gtid04", }}, }, - } - gotEvent := got.Events[0] - gotEvent.Keyspace = "" - gotEvent.Shard = "" - if !proto.Equal(gotEvent, wantevent) { - t.Errorf("vgtid: %v, want %v", got.Events[0], wantevent) - } - verifyEvents(t, ch, want1) + }, receivedResponses[1].Events[0]) + + require.EqualExportedValues(t, want1, receivedResponses[2]) } func TestVStreamJournalNoMatch(t *testing.T) { @@ -1233,8 +1366,32 @@ func TestVStreamJournalNoMatch(t *testing.T) { Gtid: "pos", }}, } - ch := startVStream(ctx, t, vsm, vgtid, nil) - verifyEvents(t, ch, want1, wantjn1, want2, wantjn2, want3) + + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + receivedResponses := make([]*binlogdatapb.VStreamResponse, 0) + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { + receivedResponses = append(receivedResponses, &binlogdatapb.VStreamResponse{Events: events}) + + if len(receivedResponses) == 5 { + // Stop streaming after receiving all expected responses. + vstreamCancel() + } + + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + + require.Equal(t, 5, len(receivedResponses)) + + require.EqualExportedValues(t, want1, receivedResponses[0]) + require.EqualExportedValues(t, wantjn1, receivedResponses[1]) + require.EqualExportedValues(t, want2, receivedResponses[2]) + require.EqualExportedValues(t, wantjn2, receivedResponses[3]) + require.EqualExportedValues(t, want3, receivedResponses[4]) } func TestVStreamJournalPartialMatch(t *testing.T) { @@ -1284,14 +1441,13 @@ func TestVStreamJournalPartialMatch(t *testing.T) { Gtid: "pos1020", }}, } + err := vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { - t.Errorf("unexpected events: %v", events) - return nil + return fmt.Errorf("unexpected events: %v", events) }) - wantErr := "not all journaling participants are in the stream" - if err == nil || !strings.Contains(err.Error(), wantErr) { - t.Errorf("vstream end: %v, must contain %v", err, wantErr) - } + + require.Error(t, err) + require.Contains(t, err.Error(), "not all journaling participants are in the stream") // Try a different order (different code path) send = []*binlogdatapb.VEvent{ @@ -1313,14 +1469,13 @@ func TestVStreamJournalPartialMatch(t *testing.T) { }}, } sbc2.AddVStreamEvents(send, nil) + err = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { - t.Errorf("unexpected events: %v", events) - return nil + return fmt.Errorf("unexpected events: %v", events) }) - if err == nil || !strings.Contains(err.Error(), wantErr) { - t.Errorf("vstream end: %v, must contain %v", err, wantErr) - } - cancel() + + require.Error(t, err) + require.Contains(t, err.Error(), "not all journaling participants are in the stream") } func TestResolveVStreamParams(t *testing.T) { @@ -1569,27 +1724,25 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } for _, tcase := range testcases { t.Run(tcase.name, func(t *testing.T) { - var mu sync.Mutex var heartbeatCount int - ctx, cancel := context.WithCancel(ctx) - go func() { - vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{HeartbeatInterval: tcase.heartbeatInterval}, - func(events []*binlogdatapb.VEvent) error { - mu.Lock() - defer mu.Unlock() - for _, event := range events { - if event.Type == binlogdatapb.VEventType_HEARTBEAT { - heartbeatCount++ - } - } - return nil - }) - }() - time.Sleep(time.Duration(4500) * time.Millisecond) - mu.Lock() - defer mu.Unlock() + + vstreamCtx, vstreamCancel := context.WithTimeout(ctx, time.Duration(4500)*time.Millisecond) + defer vstreamCancel() + + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{HeartbeatInterval: tcase.heartbeatInterval}, func(events []*binlogdatapb.VEvent) error { + for _, event := range events { + if event.Type == binlogdatapb.VEventType_HEARTBEAT { + heartbeatCount++ + } + } + + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.DeadlineExceeded) + require.Equalf(t, heartbeatCount, tcase.want, "got %d, want %d", heartbeatCount, tcase.want) - cancel() }) } } @@ -1972,26 +2125,27 @@ func TestVStreamManagerHealthCheckResponseHandling(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - done := make(chan struct{}) - go func() { - sctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - defer close(done) - // SandboxConn's VStream implementation always waits for the context to timeout. - err := vsm.VStream(sctx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error { - require.Fail(t, "unexpected event", "Received unexpected events: %v", events) - return nil - }) - if tc.wantErr != "" { // Otherwise we simply expect the context to timeout - if !strings.Contains(logger.String(), tc.wantErr) { - require.Fail(t, "unexpected vstream error", "vstream ended with error: %v, which did not contain: %s", err, tc.wantErr) - } - } - }() if tc.wantErr != "" { source.SetStreamHealthResponse(tc.hcRes) } - <-done + + vstreamCtx, vstreamCancel := context.WithTimeout(ctx, 5*time.Second) + defer vstreamCancel() + + // SandboxConn's VStream implementation always waits for the context to timeout. + err := vsm.VStream(vstreamCtx, tabletType, vgtid, nil, nil, func(events []*binlogdatapb.VEvent) error { + return fmt.Errorf("unexpected events: %v", events) + }) + + if tc.wantErr != "" { + require.Error(t, err) + require.Contains(t, logger.String(), tc.wantErr) + } else { + // Otherwise we simply expect the context to timeout + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.DeadlineExceeded) + } + logger.Clear() }) } @@ -2003,36 +2157,6 @@ func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv s return newVStreamManager(srvResolver, serv, cell) } -func startVStream(ctx context.Context, t *testing.T, vsm *vstreamManager, vgtid *binlogdatapb.VGtid, flags *vtgatepb.VStreamFlags) <-chan *binlogdatapb.VStreamResponse { - t.Helper() - if flags == nil { - flags = &vtgatepb.VStreamFlags{} - } - ch := make(chan *binlogdatapb.VStreamResponse) - go func() { - _ = vsm.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, nil, flags, func(events []*binlogdatapb.VEvent) error { - ch <- &binlogdatapb.VStreamResponse{Events: events} - return nil - }) - }() - return ch -} - -func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants ...*binlogdatapb.VStreamResponse) { - t.Helper() - for i, want := range wants { - val := <-ch - got := val.CloneVT() - require.NotNil(t, got) - for _, event := range got.Events { - event.Timestamp = 0 - } - if !proto.Equal(got, want) { - t.Errorf("vstream(%d):\n%v, want\n%v", i, got, want) - } - } -} - func getVEvents(keyspace, shard string, count, idx int64) []*binlogdatapb.VEvent { mu.Lock() defer mu.Unlock() @@ -2096,3 +2220,18 @@ func addTabletToSandboxTopo(tb testing.TB, ctx context.Context, st *sandboxTopo, err = st.topoServer.CreateTablet(ctx, tablet) require.NoError(tb, err) } + +func verifyEvents(t *testing.T, ch <-chan *binlogdatapb.VStreamResponse, wants ...*binlogdatapb.VStreamResponse) { + t.Helper() + for i, want := range wants { + val := <-ch + got := val.CloneVT() + require.NotNil(t, got) + for _, event := range got.Events { + event.Timestamp = 0 + } + if !proto.Equal(got, want) { + t.Errorf("vstream(%d):\n%v, want\n%v", i, got, want) + } + } +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 113cb2314a0..e75c4fea0e4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -305,8 +305,8 @@ func (ct *controller) runBlp(ctx context.Context) (err error) { // it's a FAILED_PRECONDITION vterror, OR we cannot identify this as // non-recoverable BUT it has persisted beyond the retry limit // (maxTimeToRetryError). In addition, we cannot restart a workflow - // started with AtomicCopy which has _any_ error. - if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy)) || + // started with AtomicCopy which has _any_ error during copy phase. + if (err != nil && vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy) && vr.state == binlogdatapb.VReplicationWorkflowState_Copying) || isUnrecoverableError(err) || !ct.lastWorkflowError.ShouldRetry() { err = vterrors.Wrapf(err, TerminalErrorIndicator) diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils.go b/go/vt/vttablet/tabletmanager/vreplication/utils.go index cff53ce55ee..006834d2815 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/utils.go +++ b/go/vt/vttablet/tabletmanager/vreplication/utils.go @@ -212,6 +212,7 @@ func isUnrecoverableError(err error) bool { sqlerror.ErrWrongValueForType, sqlerror.ERSPDoesNotExist, sqlerror.ERSpecifiedAccessDenied, + sqlerror.ERBinlogCreateRoutineNeedSuper, sqlerror.ERSyntaxError, sqlerror.ERTooBigRowSize, sqlerror.ERTooBigSet, diff --git a/go/vt/vttablet/tabletmanager/vreplication/utils_test.go b/go/vt/vttablet/tabletmanager/vreplication/utils_test.go index 2406796aace..b1d7b7d6d67 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/utils_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/utils_test.go @@ -161,6 +161,11 @@ func TestIsUnrecoverableError(t *testing.T) { err: sqlerror.NewSQLError(sqlerror.ERErrorDuringCommit, "unknown", "ERROR HY000: Got error 149 - 'Lock deadlock; Retry transaction' during COMMIT"), expected: false, }, + { + name: "SQL error with ERBinlogCreateRoutineNeedSuper", + err: sqlerror.NewSQLError(sqlerror.ERBinlogCreateRoutineNeedSuper, "unknown", "error applying event: You do not have the SUPER privilege and binary logging is enabled (you *might* want to use the less safe log_bin_trust_function_creators variable) (errno 1419) (sqlstate HY000) during query: CREATE DEFINER=`root`@`localhost` TRIGGER upd_customer BEFORE UPDATE ON customer FOR EACH ROW SET @email = NEW.email + \" (updated)\""), + expected: true, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) {