diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 301cda7a753..4d32cf7c126 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -210,6 +210,59 @@ func TestVStreamCopyBasic(t *testing.T) { } } +func TestVStreamCurrent(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + var shardGtids []*binlogdatapb.ShardGtid + var vgtid = &binlogdatapb.VGtid{} + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "-80", + Gtid: "current", + }) + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "80-", + Gtid: "current", + }) + vgtid.ShardGtids = shardGtids + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter) + _, _ = conn, mconn + if err != nil { + t.Fatal(err) + } + numExpectedEvents := 4 // vgtid+other per shard for "current" + require.NotNil(t, reader) + var evs []*binlogdatapb.VEvent + for { + e, err := reader.Recv() + switch err { + case nil: + evs = append(evs, e...) + printEvents(evs) // for debugging ci failures + if len(evs) == numExpectedEvents { + t.Logf("TestVStreamCurrent was successful") + return + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + t.Fatalf("remote error: %v\n", err) + } + } +} + var printMu sync.Mutex func printEvents(evs []*binlogdatapb.VEvent) { diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 21813cb1c85..7e4e4a189df 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -244,7 +244,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha ev := proto.Clone(event).(*binlogdatapb.VEvent) ev.RowEvent.TableName = sgtid.Keyspace + "." + ev.RowEvent.TableName sendevents = append(sendevents, ev) - case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL: + case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER: sendevents = append(sendevents, event) eventss = append(eventss, sendevents) if err := vs.sendAll(sgtid, eventss); err != nil {