From 94a8c7e48ad06a403a5b0364dfb61f0243daabd2 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 28 Jun 2020 11:25:32 +0200 Subject: [PATCH 1/2] VStream Client: send current position events immediately Signed-off-by: Rohit Nayak --- go/vt/vtgate/endtoend/vstream_test.go | 58 +++++++++++++++++++++++++++ go/vt/vtgate/vstream_manager.go | 2 +- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 301cda7a753..e2632077e08 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -210,6 +210,64 @@ func TestVStreamCopyBasic(t *testing.T) { } } +func TestVStreamCurrent(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + if err != nil { + t.Fatal(err) + } + + var shardGtids []*binlogdatapb.ShardGtid + var vgtid = &binlogdatapb.VGtid{} + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "-80", + Gtid: "current", + }) + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "80-", + Gtid: "current", + }) + vgtid.ShardGtids = shardGtids + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter) + _, _ = conn, mconn + if err != nil { + t.Fatal(err) + } + numExpectedEvents := 4 // vgtid+other per shard for "current" + require.NotNil(t, reader) + var evs []*binlogdatapb.VEvent + for { + e, err := reader.Recv() + switch err { + case nil: + evs = append(evs, e...) + printEvents(evs) // for debugging ci failures + if len(evs) == numExpectedEvents { + t.Logf("TestVStreamCurrent was successful") + return + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + t.Fatalf("remote error: %v\n", err) + } + } +} + var printMu sync.Mutex func printEvents(evs []*binlogdatapb.VEvent) { diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 21813cb1c85..7e4e4a189df 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -244,7 +244,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha ev := proto.Clone(event).(*binlogdatapb.VEvent) ev.RowEvent.TableName = sgtid.Keyspace + "." + ev.RowEvent.TableName sendevents = append(sendevents, ev) - case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL: + case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER: sendevents = append(sendevents, event) eventss = append(eventss, sendevents) if err := vs.sendAll(sgtid, eventss); err != nil { From 75e4f696614342f14fecf36c8128c6e722c04f8d Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 30 Jun 2020 12:30:47 +0200 Subject: [PATCH 2/2] VStream Client: simplify test Signed-off-by: Rohit Nayak --- go/vt/vtgate/endtoend/vstream_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index e2632077e08..4d32cf7c126 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -216,11 +216,6 @@ func TestVStreamCurrent(t *testing.T) { gconn, conn, mconn, closeConnections := initialize(ctx, t) defer closeConnections() - _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) - if err != nil { - t.Fatal(err) - } - var shardGtids []*binlogdatapb.ShardGtid var vgtid = &binlogdatapb.VGtid{} shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{