Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
384 changes: 257 additions & 127 deletions go/vt/proto/binlogdata/binlogdata.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions go/vt/vtcombo/tablet_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ func (itc *internalTabletConn) StreamHealth(ctx context.Context, callback func(*
}

// VStream is part of queryservice.QueryService.
func (itc *internalTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
err := itc.tablet.qsc.QueryService().VStream(ctx, target, startPos, filter, send)
func (itc *internalTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tableLastPKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
err := itc.tablet.qsc.QueryService().VStream(ctx, target, startPos, tableLastPKs, filter, send)
return tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Expand Down
111 changes: 104 additions & 7 deletions go/vt/vtgate/endtoend/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,48 @@ package endtoend
import (
"context"
"fmt"
"io"
"sync"
"testing"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/proto/query"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

func TestVStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

func initialize(ctx context.Context, t *testing.T) (*vtgateconn.VTGateConn, *mysql.Conn, *mysql.Conn, func()) {
gconn, err := vtgateconn.Dial(ctx, grpcAddress)
if err != nil {
t.Fatal(err)
}
defer gconn.Close()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
mconn, err := mysql.Connect(ctx, &mysqlParams)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
close := func() {
gconn.Close()
conn.Close()
mconn.Close()
}
return gconn, conn, mconn, close
}
func TestVStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()

mpos, err := mconn.MasterPosition()
if err != nil {
Expand Down Expand Up @@ -128,3 +140,88 @@ func TestVStream(t *testing.T) {
}
cancel()
}

func TestVStreamCopyBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()

_, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
if err != nil {
t.Fatal(err)
}

lastPK := sqltypes.Result{
Fields: []*query.Field{{Name: "id1", Type: query.Type_INT32}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt32(4)}},
}
qr := sqltypes.ResultToProto3(&lastPK)
tablePKs := []*binlogdatapb.TableLastPK{{
TableName: "t1",
Lastpk: qr,
}}
var shardGtids []*binlogdatapb.ShardGtid
var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "-80",
Gtid: "",
TablePKs: tablePKs,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
Keyspace: "ks",
Shard: "80-",
Gtid: "",
TablePKs: tablePKs,
})
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter)
_, _ = conn, mconn
if err != nil {
t.Fatal(err)
}
numExpectedEvents := 2 /* num shards */ * (7 /* begin/field/vgtid:pos/2 rowevents avg/vgitd: lastpk/commit) */ + 3 /* begin/vgtid/commit for completed table */)
require.NotNil(t, reader)
var evs []*binlogdatapb.VEvent
for {
e, err := reader.Recv()
switch err {
case nil:
evs = append(evs, e...)
if len(evs) == numExpectedEvents {
t.Logf("TestVStreamCopyBasic was successful")
return
}
printEvents(evs) // for debugging ci failures
case io.EOF:
log.Infof("stream ended\n")
cancel()
default:
log.Errorf("Returned err %v", err)
t.Fatalf("remote error: %v\n", err)
}
}
}

var printMu sync.Mutex

func printEvents(evs []*binlogdatapb.VEvent) {
printMu.Lock()
defer printMu.Unlock()
if len(evs) == 0 {
return
}
s := "\n===START===" + "\n"
for i, ev := range evs {
s += fmt.Sprintf("Event %d; %v\n", i, ev)
}
s += "===END===" + "\n"
log.Infof("%s", s)
}
31 changes: 30 additions & 1 deletion go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
newvgtid.ShardGtids = append(newvgtid.ShardGtids, sgtid)
}
}
//TODO add tablepk validations

return newvgtid, filter, nil
}

Expand Down Expand Up @@ -212,7 +214,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected number or shards: %v", rss)
}
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, vs.filter, func(events []*binlogdatapb.VEvent) error {
err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0

Expand Down Expand Up @@ -319,6 +321,33 @@ func (vs *vstream) sendAll(sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdat
Type: binlogdatapb.VEventType_VGTID,
Vgtid: proto.Clone(vs.vgtid).(*binlogdatapb.VGtid),
}
} else if event.Type == binlogdatapb.VEventType_LASTPK {
var foundIndex = -1
eventTablePK := event.LastPKEvent.TableLastPK
for idx, pk := range sgtid.TablePKs {
if pk.TableName == eventTablePK.TableName {
foundIndex = idx
break
}
}
if foundIndex == -1 {
if !event.LastPKEvent.Completed {
sgtid.TablePKs = append(sgtid.TablePKs, eventTablePK)
}
} else {
if event.LastPKEvent.Completed {
// remove tablepk from sgtid
sgtid.TablePKs[foundIndex] = sgtid.TablePKs[len(sgtid.TablePKs)-1]
sgtid.TablePKs[len(sgtid.TablePKs)-1] = nil
sgtid.TablePKs = sgtid.TablePKs[:len(sgtid.TablePKs)-1]
} else {
sgtid.TablePKs[foundIndex] = eventTablePK
}
}
events[j] = &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_VGTID,
Vgtid: proto.Clone(vs.vgtid).(*binlogdatapb.VGtid),
}
}
}
if err := vs.send(events); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestHistorianSchemaUpdate(t *testing.T) {
return nil
}
go func() {
if err := tsv.VStream(ctx, target, "current", filter, send); err != nil {
if err := tsv.VStream(ctx, target, "current", nil, filter, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestSchemaVersioning(t *testing.T) {
}
go func() {
defer close(eventCh)
if err := tsv.VStream(ctx, target, "current", filter, send); err != nil {
if err := tsv.VStream(ctx, target, "current", nil, filter, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func TestSchemaVersioning(t *testing.T) {
}
go func() {
defer close(eventCh)
if err := tsv.VStream(ctx, target, startPos, filter, send); err != nil {
if err := tsv.VStream(ctx, target, startPos, nil, filter, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestSchemaVersioning(t *testing.T) {
}
go func() {
defer close(eventCh)
if err := tsv.VStream(ctx, target, startPos, filter, send); err != nil {
if err := tsv.VStream(ctx, target, startPos, nil, filter, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/grpcqueryservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (q *query) VStream(request *binlogdatapb.VStreamRequest, stream queryservic
request.EffectiveCallerId,
request.ImmediateCallerId,
)
err = q.server.VStream(ctx, request.Target, request.Position, request.Filter, func(events []*binlogdatapb.VEvent) error {
err = q.server.VStream(ctx, request.Target, request.Position, request.TableLastPKs, request.Filter, func(events []*binlogdatapb.VEvent) error {
return stream.Send(&binlogdatapb.VStreamResponse{
Events: events,
})
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*qu
}

// VStream starts a VReplication stream.
func (conn *gRPCQueryClient) VStream(ctx context.Context, target *querypb.Target, position string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (conn *gRPCQueryClient) VStream(ctx context.Context, target *querypb.Target, position string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
stream, err := func() (queryservicepb.Query_VStreamClient, error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
Expand All @@ -612,6 +612,7 @@ func (conn *gRPCQueryClient) VStream(ctx context.Context, target *querypb.Target
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
Position: position,
Filter: filter,
TableLastPKs: tablePKs,
}
stream, err := conn.c.VStream(ctx, req)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/queryservice/queryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type QueryService interface {
MessageAck(ctx context.Context, target *querypb.Target, name string, ids []*querypb.Value) (count int64, err error)

// VStream streams VReplication events based on the specified filter.
VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
VStream(ctx context.Context, target *querypb.Target, startPos string, tableLastPKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error

// VStreamRows streams rows of a table from the specified starting point.
VStreamRows(ctx context.Context, target *querypb.Target, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ func (ws *wrappedService) MessageAck(ctx context.Context, target *querypb.Target
return count, err
}

func (ws *wrappedService) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (ws *wrappedService) VStream(ctx context.Context, target *querypb.Target, startPos string, tableLastPKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return ws.wrapper(ctx, target, ws.impl, "VStream", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.VStream(ctx, target, startPos, filter, send)
innerErr := conn.VStream(ctx, target, startPos, tableLastPKs, filter, send)
return false, innerErr
})
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/sandboxconn/sandboxconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (sbc *SandboxConn) AddVStreamEvents(events []*binlogdatapb.VEvent, err erro
}

// VStream is part of the QueryService interface.
func (sbc *SandboxConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (sbc *SandboxConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
if sbc.StartPos != "" && sbc.StartPos != startPos {
return fmt.Errorf("startPos(%v): %v, want %v", target, startPos, sbc.StartPos)
}
Expand Down
8 changes: 5 additions & 3 deletions go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,14 +666,16 @@ var TestStreamHealthStreamHealthResponse = &querypb.StreamHealthResponse{
Shard: "test_shard",
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
Serving: true,

TabletExternallyReparentedTimestamp: 1234589,

RealtimeStats: &querypb.RealtimeStats{
CpuUsage: 1.0,
HealthError: "random error",
SecondsBehindMaster: 234,
BinlogPlayersCount: 1,
SecondsBehindMasterFilteredReplication: 2,
CpuUsage: 1.0,
},
}

Expand All @@ -697,7 +699,7 @@ func (f *FakeQueryService) StreamHealth(ctx context.Context, callback func(*quer
}

// VStream is part of the queryservice.QueryService interface
func (f *FakeQueryService) VStream(ctx context.Context, target *querypb.Target, position string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (f *FakeQueryService) VStream(ctx context.Context, target *querypb.Target, position string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
panic("not implemented")
}

Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/external_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type VStreamerClient interface {
Close(context.Context) error

// VStream streams VReplication events based on the specified filter.
VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error

// VStreamRows streams rows of a table from the specified starting point.
VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error
Expand Down Expand Up @@ -124,8 +124,8 @@ func (c *mysqlConnector) Close(ctx context.Context) error {
return nil
}

func (c *mysqlConnector) VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return c.vstreamer.Stream(ctx, startPos, filter, send)
func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, send)
}

func (c *mysqlConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error {
Expand Down Expand Up @@ -169,8 +169,8 @@ func (tc *tabletConnector) Close(ctx context.Context) error {
return tc.qs.Close(ctx)
}

func (tc *tabletConnector) VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return tc.qs.VStream(ctx, tc.target, startPos, filter, send)
func (tc *tabletConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return tc.qs.VStream(ctx, tc.target, startPos, tablePKs, filter, send)
}

func (tc *tabletConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,15 @@ func (ftc *fakeTabletConn) StreamHealth(ctx context.Context, callback func(*quer
var vstreamHook func(ctx context.Context)

// VStream directly calls into the pre-initialized engine.
func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
if target.Keyspace != "vttest" {
<-ctx.Done()
return io.EOF
}
if vstreamHook != nil {
vstreamHook(ctx)
}
return streamerEngine.Stream(ctx, startPos, filter, send)
return streamerEngine.Stream(ctx, startPos, tablePKs, filter, send)
}

// vstreamRowsHook allows you to do work just before calling VStreamRows.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {

streamErr := make(chan error, 1)
go func() {
streamErr <- vp.vr.sourceVStreamer.VStream(ctx, mysql.EncodePosition(vp.startPos), vp.replicatorPlan.VStreamFilter, func(events []*binlogdatapb.VEvent) error {
streamErr <- vp.vr.sourceVStreamer.VStream(ctx, mysql.EncodePosition(vp.startPos), nil, vp.replicatorPlan.VStreamFilter, func(events []*binlogdatapb.VEvent) error {
return relay.Send(events)
})
}()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/messager/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type TabletService interface {
// VStreamer defines the functions of VStreamer
// that the messager needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error
}

Expand Down
Loading