Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 54 additions & 48 deletions tools/rowlog/rowlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,28 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/binlog/binlogplayer"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
logutilpb "vitess.io/vitess/go/vt/proto/logutil"
"vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
_ "vitess.io/vitess/go/vt/topo/etcd2topo"
"vitess.io/vitess/go/vt/vtctl/vtctlclient"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

_ "vitess.io/vitess/go/vt/topo/etcd2topo" // TODO: after #11394, add rowlog to this https://github.com/vitessio/vitess/pull/11394/files#diff-ee3c1b94c587244ea0645a8ee10187e1112167725f752d58cf17bab6e6d1047cR85
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
_ "vitess.io/vitess/go/vt/vttablet/grpctabletconn"

_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
"vitess.io/vitess/go/vt/vtctl/vtctlclient"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
logutilpb "vitess.io/vitess/go/vt/proto/logutil"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

type RowLogConfig struct {
Expand All @@ -57,12 +59,12 @@ func (rlc *RowLogConfig) Validate() bool {

func usage() {
logger := logutil.NewConsoleLogger()
flag.CommandLine.SetOutput(logutil.NewLoggerWriter(logger))
flag.Usage = func() {
pflag.CommandLine.SetOutput(logutil.NewLoggerWriter(logger))
pflag.Usage = func() {
logger.Printf("Rowlog Usage:\n")
s := "rowlog -ids <id list csv> -table <table_name> -pk <primary_key_only_ints> -source <source_keyspace> -target <target_keyspace> "
s += "-vtctld <vtctl url> -vtgate <vtgate url> -cells <cell names csv> -topo_implementation <topo type, eg: etcd2> "
s += "-topo_global_server_address <top url> -topo_global_root <topo root dir>\n"
s := "rowlog --ids <id list csv> --table <table_name> --pk <primary_key_only_ints> --source <source_keyspace> --target <target_keyspace> "
s += "--vtctld <vtctl url> --vtgate <vtgate url> --cells <cell names csv> --topo_implementation <topo type, eg: etcd2> "
s += "--topo_global_server_address <top url> --topo_global_root <topo root dir>\n"
logger.Printf(s)
}
}
Expand All @@ -73,7 +75,7 @@ func main() {
ctx := context.Background()
config := parseCommandLine()
if !config.Validate() {
flag.Usage()
pflag.Usage()
return
}
log.Infof("Starting rowlogger with config: %s", config)
Expand Down Expand Up @@ -159,14 +161,13 @@ func startStreaming(ctx context.Context, vtgate, vtctld, keyspace, tablet, table
}
defer conn.Close()
reader, _ := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{})
var fields []*query.Field
var fields []*querypb.Field
var gtid string
var plan *TablePlan
var lastLoggedAt int64
var totalRowsForTable, filteredRows int
for {
evs, err := reader.Recv()
// fmt.Printf("events received: %d\n",len(evs))
switch err {
case nil:
for _, ev := range evs {
Expand All @@ -180,10 +181,8 @@ func startStreaming(ctx context.Context, vtgate, vtctld, keyspace, tablet, table
switch ev.Type {
case binlogdatapb.VEventType_VGTID:
gtid = ev.Vgtid.ShardGtids[0].Gtid
// fmt.Printf("gtid %s\n", gtid)
case binlogdatapb.VEventType_FIELD:
fields = ev.FieldEvent.Fields
// fmt.Printf("field %s\n", fields)
plan = getTablePlan(keyspace, fields, ev.FieldEvent.TableName, pk, ids)
if !fieldsPrinted {
outputHeader(plan)
Expand All @@ -193,30 +192,30 @@ func startStreaming(ctx context.Context, vtgate, vtctld, keyspace, tablet, table
totalRowsForTable += len(ev.RowEvent.RowChanges)
rows := processRowEvent(plan, gtid, ev)
if len(rows) > 0 {
// fmt.Printf("#rows %d\n", len(rows))
filteredRows += len(rows)
outputRows(plan, rows)
}
default:
// fmt.Printf("event type %v\n",ev.Type)
}
}
// fmt.Printf("stopPos %s\n", stopPos)
var err error
var currentPosition, stopPosition mysql.Position
currentPosition, err = binlogplayer.DecodePosition(gtid)
if err != nil {
fmt.Errorf("Error decoding position for %s:%vs\n", gtid, err.Error())
fmt.Printf("Error decoding position for %s:%vs\n", gtid, err.Error())
}
stopPosition, err = binlogplayer.DecodePosition(stopPos)
if err != nil {
fmt.Errorf("Error decoding position for %s:%vs\n", stopPos, err.Error())
fmt.Printf("Error decoding position for %s:%vs\n", stopPos, err.Error())
}
if currentPosition.AtLeast(stopPosition) {
log.Infof("Finished streaming keyspace %s from %s upto %s, total rows seen %d", keyspace, startPos, stopPos, totalRowsForTable)
return "", "", true, true, nil
}
// return gtid, stopPos, false, fieldsPrinted, nil //uncomment for testing resumability

if testResumability {
return gtid, stopPos, false, fieldsPrinted, nil
}
case io.EOF:
log.Infof("stream ended before reaching stop pos")
fmt.Printf("stream ended before reaching stop pos\n")
Expand Down Expand Up @@ -336,7 +335,7 @@ func processRowEvent(plan *TablePlan, gtid string, ev *binlogdatapb.VEvent) []*R
return rowLogs
}

func getTablePlan(keyspace string, fields []*query.Field, table, pk string, ids []string) *TablePlan {
func getTablePlan(keyspace string, fields []*querypb.Field, table, pk string, ids []string) *TablePlan {
allowedIds := make(map[string]bool)
for _, id := range ids {
allowedIds[id] = true
Expand All @@ -362,13 +361,12 @@ type TablePlan struct {
table, pk string
allowedIds map[string]bool
pkIndex int64
fields []*query.Field
fields []*querypb.Field
keyspace string
}

func getFlavor(ctx context.Context, server, keyspace string) string {
curPos, err := getPosition(ctx, server, keyspace, "0")
// fmt.Printf("curpos is %s\n", curPos)
if err != nil {
return ""
}
Expand All @@ -392,27 +390,39 @@ func getTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace st
return tabletId

}
func trickGlog() {
var args []string
os.Args, args = os.Args[:1], os.Args[1:]
defer func() { os.Args = append(os.Args, args...) }()
flag.Parse()
}

var testResumability bool

func parseCommandLine() *RowLogConfig {
sourceKeyspace := flag.String("source", "", "")
targetKeyspace := flag.String("target", "", "")
ids := flag.String("ids", "", "")
pk := flag.String("pk", "", "")
table := flag.String("table", "", "")
vtgate := flag.String("vtgate", "", "")
vtctld := flag.String("vtctld", "", "")
cells := flag.String("cells", "", "")
trickGlog()
sourceKeyspace := pflag.StringP("source", "s", "", "")
targetKeyspace := pflag.StringP("target", "t", "", "")
ids := pflag.StringSlice("ids", nil, "")
pk := pflag.String("pk", "", "")
table := pflag.String("table", "", "")
vtgate := pflag.String("vtgate", "", "")
vtctld := pflag.String("vtctld", "", "")
cells := pflag.StringSlice("cells", nil, "")

flag.Parse()
pflag.BoolVar(&testResumability, "test_resumability", testResumability, "set to test stream resumability")

pflag.Parse()

return &RowLogConfig{
sourceKeyspace: *sourceKeyspace,
targetKeyspace: *targetKeyspace,
table: *table,
pk: *pk,
ids: strings.Split(*ids, ","),
ids: *ids,
vtctld: *vtctld,
vtgate: *vtgate,
cells: strings.Split(*cells, ","),
cells: *cells,
}
}

Expand All @@ -422,7 +432,7 @@ func processPositionResult(gtidset string) (string, string) {
subs := strings.Split(arr[1], "-")
id, err := strconv.Atoi(subs[0])
if err != nil {
fmt.Errorf(err.Error())
fmt.Printf(err.Error())
return "", ""
}
firstPos := arr[0] + ":" + strconv.Itoa(id) // subs[0]
Expand All @@ -435,37 +445,33 @@ func parseExecOutput(result string) string {
resultMap := make(map[string]any)
err := json.Unmarshal([]byte(result), &resultMap)
if err != nil {
fmt.Errorf("error parsing result json %s", result)
fmt.Printf("error parsing result json %s", result)
return ""
}
rows := reflect.ValueOf(resultMap["rows"])
s := fmt.Sprintf("%v", rows)
s = strings.Trim(s, "[]")
// fmt.Printf("gtidset %s", s)
return s
}

func getPositions(ctx context.Context, server, tablet string) (string, string, error) {
query := "select GTID_SUBTRACT(@@GLOBAL.gtid_executed, GTID_SUBTRACT(@@GLOBAL.gtid_executed, @@GLOBAL.gtid_purged));"
results, err := execVtctl(ctx, server, []string{"ExecuteFetchAsDba", "-json", tablet, query})
results, err := execVtctl(ctx, server, []string{"ExecuteFetchAsDba", "--json", tablet, query})
if err != nil {
fmt.Println(err)
log.Errorf(err.Error())
return "", "", err
}
// fmt.Printf("results are %v\n", results)
firstPos := parseExecOutput(strings.Join(results, ""))

query = "select @@GLOBAL.gtid_executed;"
results, err = execVtctl(ctx, server, []string{"ExecuteFetchAsDba", "-json", tablet, query})
results, err = execVtctl(ctx, server, []string{"ExecuteFetchAsDba", "--json", tablet, query})
if err != nil {
fmt.Println(err)
log.Errorf(err.Error())
return "", "", err
}
// fmt.Printf("results are %v\n", results)
lastPos := parseExecOutput(strings.Join(results, ""))
// fmt.Printf("firstPos %s, lastPos %s\n", firstPos, lastPos)
return firstPos, lastPos, nil
}

Expand Down