-
Notifications
You must be signed in to change notification settings - Fork 2.3k
VSCopy: Send COPY_COMPLETED events when the copy operation is done #11740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e0f36b2
a91277a
5a6440b
8912112
5825b27
9e4b5d8
f4ac20a
149a150
076327d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -169,7 +169,7 @@ func TestVStreamCopyBasic(t *testing.T) { | |
| gconn, conn, mconn, closeConnections := initialize(ctx, t) | ||
| defer closeConnections() | ||
|
|
||
| _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) | ||
| _, err := conn.ExecuteFetch("insert into t1_copy_basic(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
@@ -180,7 +180,7 @@ func TestVStreamCopyBasic(t *testing.T) { | |
| } | ||
| qr := sqltypes.ResultToProto3(&lastPK) | ||
| tablePKs := []*binlogdatapb.TableLastPK{{ | ||
| TableName: "t1", | ||
| TableName: "t1_copy_basic", | ||
| Lastpk: qr, | ||
| }} | ||
| var shardGtids []*binlogdatapb.ShardGtid | ||
|
|
@@ -200,8 +200,8 @@ func TestVStreamCopyBasic(t *testing.T) { | |
| vgtid.ShardGtids = shardGtids | ||
| filter := &binlogdatapb.Filter{ | ||
| Rules: []*binlogdatapb.Rule{{ | ||
| Match: "t1", | ||
| Filter: "select * from t1", | ||
| Match: "t1_copy_basic", | ||
| Filter: "select * from t1_copy_basic", | ||
| }}, | ||
| } | ||
| flags := &vtgatepb.VStreamFlags{} | ||
|
|
@@ -210,19 +210,44 @@ func TestVStreamCopyBasic(t *testing.T) { | |
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| numExpectedEvents := 2 /* num shards */ * (7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */) | ||
| numExpectedEvents := 2 /* num shards */ *(7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ +3 /* begin/vgtid/commit for completed table */ +1 /* copy operation completed */) + 1 /* fully copy operation completed */ | ||
| expectedCompletedEvents := []string{ | ||
| `type:COPY_COMPLETED keyspace:"ks" shard:"-80"`, | ||
| `type:COPY_COMPLETED keyspace:"ks" shard:"80-"`, | ||
| `type:COPY_COMPLETED`, | ||
| } | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With this PR change, the above three events are added to the expected events. |
||
| require.NotNil(t, reader) | ||
| var evs []*binlogdatapb.VEvent | ||
| var completedEvs []*binlogdatapb.VEvent | ||
| for { | ||
| e, err := reader.Recv() | ||
| switch err { | ||
| case nil: | ||
| evs = append(evs, e...) | ||
|
|
||
| for _, ev := range e { | ||
| if ev.Type == binlogdatapb.VEventType_COPY_COMPLETED { | ||
| completedEvs = append(completedEvs, ev) | ||
| } | ||
| } | ||
|
|
||
| printEvents(evs) // for debugging ci failures | ||
|
|
||
| if len(evs) == numExpectedEvents { | ||
| // The arrival order of COPY_COMPLETED events with keyspace/shard is not constant. | ||
| // On the other hand, the last event should always be a fully COPY_COMPLETED event. | ||
| // That's why the sort.Slice doesn't have to handle the last element in completedEvs. | ||
| sort.Slice(completedEvs[:len(completedEvs)-1], func(i, j int) bool { | ||
| return completedEvs[i].GetShard() < completedEvs[j].GetShard() | ||
| }) | ||
| for i, ev := range completedEvs { | ||
| require.Regexp(t, expectedCompletedEvents[i], ev.String()) | ||
| } | ||
| t.Logf("TestVStreamCopyBasic was successful") | ||
| return | ||
| } else if numExpectedEvents < len(evs) { | ||
| t.Fatalf("len(events)=%v are not expected\n", len(evs)) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, once
5825b27 makes the same test fail faster and shows more straightforward error messages. |
||
| } | ||
| printEvents(evs) // for debugging ci failures | ||
| case io.EOF: | ||
| log.Infof("stream ended\n") | ||
| cancel() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,6 +101,9 @@ type vstream struct { | |
| // the timestamp of the most recent event, keyed by streamId. streamId is of the form <keyspace>.<shard> | ||
| timestamps map[string]int64 | ||
|
|
||
| // the shard map tracking the copy completion, keyed by streamId. streamId is of the form <keyspace>.<shard> | ||
| copyCompletedShard map[string]struct{} | ||
|
|
||
| vsm *vstreamManager | ||
|
|
||
| eventCh chan []*binlogdatapb.VEvent | ||
|
|
@@ -152,6 +155,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta | |
| eventCh: make(chan []*binlogdatapb.VEvent), | ||
| heartbeatInterval: flags.GetHeartbeatInterval(), | ||
| ts: ts, | ||
| copyCompletedShard: make(map[string]struct{}), | ||
| } | ||
| return vs.stream(ctx) | ||
| } | ||
|
|
@@ -544,6 +548,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha | |
| return err | ||
| } | ||
|
|
||
| if err := vs.sendAll(ctx, sgtid, eventss); err != nil { | ||
| return err | ||
| } | ||
| eventss = nil | ||
| sendevents = nil | ||
mattlord marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| case binlogdatapb.VEventType_COPY_COMPLETED: | ||
| sendevents = append(sendevents, event) | ||
| if fullyCopied, doneEvent := vs.isCopyFullyCompleted(ctx, sgtid, event); fullyCopied { | ||
| sendevents = append(sendevents, doneEvent) | ||
| } | ||
| eventss = append(eventss, sendevents) | ||
|
|
||
| if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if err := vs.sendAll(ctx, sgtid, eventss); err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -676,6 +696,25 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e | |
| return nil | ||
| } | ||
|
|
||
| // isCopyFullyCompleted returns true if all stream has received a copy_completed event. | ||
| // If true, it will also return a new copy_completed event that needs to be sent. | ||
| // This new event represents the completion of all the copy operations. | ||
| func (vs *vstream) isCopyFullyCompleted(ctx context.Context, sgtid *binlogdatapb.ShardGtid, event *binlogdatapb.VEvent) (bool, *binlogdatapb.VEvent) { | ||
| vs.mu.Lock() | ||
| defer vs.mu.Unlock() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The function generates an additional event while holding a lock for |
||
|
|
||
| vs.copyCompletedShard[fmt.Sprintf("%s/%s", event.Keyspace, event.Shard)] = struct{}{} | ||
|
|
||
| for _, shard := range vs.vgtid.ShardGtids { | ||
| if _, ok := vs.copyCompletedShard[fmt.Sprintf("%s/%s", shard.Keyspace, shard.Shard)]; !ok { | ||
| return false, nil | ||
| } | ||
| } | ||
| return true, &binlogdatapb.VEvent{ | ||
| Type: binlogdatapb.VEventType_COPY_COMPLETED, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This event has neither keyspace nor shard. |
||
| } | ||
| } | ||
|
|
||
| func (vs *vstream) getError() error { | ||
| vs.errMu.Lock() | ||
| defer vs.errMu.Unlock() | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To pass the
End-to-End Test,I prepared the exclusive table namedt1_copy_basic.9e4b5d8The problem sharing
t1table with the other test cases (e.g. TestConsistentLookupMultiInsertIgnore) running concurrently is not introduced by this PR. Although the number of the received events matches by chance, content of the events vary depending on the concurrent tests in the first place.Since this discrepancy caused the test failure in my PR, I fixed it.