Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ import (
"log"
"time"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

/*
Expand All @@ -38,7 +41,7 @@ import (
*/
func main() {
ctx := context.Background()
streamCustomer := true
streamCustomer := false
var vgtid *binlogdatapb.VGtid
if streamCustomer {
vgtid = &binlogdatapb.VGtid{
Expand All @@ -54,11 +57,21 @@ func main() {
Gtid: "",
}}}
} else {
lastPK := sqltypes.Result{
Fields: []*querypb.Field{{Name: "customer_id", Type: querypb.Type_INT64, Charset: collations.CollationBinaryID, Flags: uint32(querypb.MySqlFlag_NUM_FLAG | querypb.MySqlFlag_BINARY_FLAG)}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt64(5)}},
}
vgtid = &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "commerce",
Keyspace: "customer",
Shard: "0",
Gtid: "",
TablePKs: []*binlogdatapb.TableLastPK{
{
TableName: "customer",
Lastpk: sqltypes.ResultToProto3(&lastPK),
},
},
}}}
}
filter := &binlogdatapb.Filter{
Expand Down
98 changes: 97 additions & 1 deletion go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"fmt"
"io"
"regexp"
"slices"
"strings"
"sync"
"time"

"golang.org/x/exp/maps"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/srvtopo"
Expand Down Expand Up @@ -180,6 +184,58 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
TabletOrder: flags.GetTabletOrder(),
},
}
keyspaces := make(map[string]bool)
for _, sgtid := range vgtid.ShardGtids {
if len(sgtid.TablePKs) > 0 {
keyspaces[sgtid.GetKeyspace()] = true
} else {
keyspaces[sgtid.GetKeyspace()] = false
}
}
for keyspace := range keyspaces {
reshard, servingShards, err := vs.keyspaceHasBeenResharded(ctx, keyspace)
if err != nil {
return err
}
if !reshard {
continue
}
if flags.StopOnReshard {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace %s has been resharded", keyspace)
}
if keyspaces[keyspace] { // Last tablepk values were provided
// So we can try to resume the copy phase for the tables.
// Remove the old shards from the vgtid and add the new shards.
newsgtids := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids))
var lastPKs []*binlogdatapb.TableLastPK
for _, cursgtid := range vs.vgtid.ShardGtids {
if cursgtid.Keyspace == keyspace {
if (cursgtid.TablePKs == nil) || (lastPKs != nil && !slices.Equal(cursgtid.TablePKs, lastPKs)) {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace %s has been resharded and empty or different table last PK values were found across shards so we cannot resume",
keyspace)
}
lastPKs = cursgtid.TablePKs
continue
}
newsgtids = append(newsgtids, cursgtid)
}
// Add the new ones.
for _, shard := range servingShards {
newsgtid := &binlogdatapb.ShardGtid{
Keyspace: keyspace,
Gtid: "current", // Start a copy phase, resuming from the lastPKs seen
Shard: shard.ShardName(),
TablePKs: lastPKs, // The resume point per table
}
newsgtids = append(newsgtids, newsgtid)
}
vs.vgtid.ShardGtids = newsgtids
} else {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "Keyspace %s has been resharded but no last tablepk values have been provided so we cannot resume",
keyspace)
}
}

return vs.stream(ctx)
}

Expand Down Expand Up @@ -737,7 +793,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
if err := vs.getError(); err != nil {
return err
}
// convert all gtids to vgtids. This should be done here while holding the lock.
// Convert all gtids to vgtids. This should be done here while holding the lock.
for j, event := range events {
if event.Type == binlogdatapb.VEventType_GTID {
// Update the VGtid and send that instead.
Expand All @@ -752,6 +808,11 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
var foundIndex = -1
eventTablePK := event.LastPKEvent.TableLastPK
for idx, pk := range sgtid.TablePKs {
// There was no matching row on the given shard. This can happen
// when resuming after a Reshard.
if pk == nil {
continue
}
if pk.TableName == eventTablePK.TableName {
foundIndex = idx
break
Expand Down Expand Up @@ -921,3 +982,38 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar
close(je.done)
return je, nil
}

func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, []*topo.ShardInfo, error) {
smap, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil)
if err != nil || len(smap) == 0 {
return false, nil, err
}
ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids))
for _, sg := range vs.vgtid.ShardGtids {
if sg.Keyspace == keyspace {
ksShardGTIDs = append(ksShardGTIDs, sg)
}
}
reshard := false
shards := maps.Values(smap)
servingShards := make([]*topo.ShardInfo, 0, len(shards))
if len(smap) != len(ksShardGTIDs) {
for i, shard := range shards {
if shard.IsPrimaryServing {
servingShards = append(servingShards, shard)
}
if !reshard {
for n, s := range shards {
if i == n { // Same shard
continue
}
if key.KeyRangeIntersect(shard.GetKeyRange(), s.GetKeyRange()) {
reshard = true
break
}
}
}
}
}
return reshard, servingShards, nil
}