Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -112,6 +113,7 @@ func NewStats() *Stats {
bps.Timings = stats.NewTimings("", "", "")
bps.Rates = stats.NewRates("", bps.Timings, 15, 60e9)
bps.History = history.New(3)
bps.SecondsBehindMaster.Set(math.MaxInt64)
return bps
}

Expand Down Expand Up @@ -200,18 +202,8 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
log.Error(err)
return err
}
blp.position, err = mysql.DecodePosition(settings.StartPos)
if err != nil {
log.Error(err)
return err
}
if settings.StopPos != "" {
blp.stopPosition, err = mysql.DecodePosition(settings.StopPos)
if err != nil {
log.Error(err)
return err
}
}
blp.position = settings.StartPos
blp.stopPosition = settings.StopPos
t, err := throttler.NewThrottler(
fmt.Sprintf("BinlogPlayer/%d", blp.uid),
"transactions",
Expand Down Expand Up @@ -525,8 +517,8 @@ func SetVReplicationState(dbClient DBClient, uid uint32, state, message string)

// VRSettings contains the settings of a vreplication table.
type VRSettings struct {
StartPos string
StopPos string
StartPos mysql.Position
StopPos mysql.Position
MaxTPS int64
MaxReplicationLag int64
State string
Expand All @@ -541,25 +533,34 @@ func ReadVRSettings(dbClient DBClient, uid uint32) (VRSettings, error) {
return VRSettings{}, fmt.Errorf("error %v in selecting vreplication settings %v", err, query)
}

if qr.RowsAffected != 1 {
if len(qr.Rows) != 1 {
return VRSettings{}, fmt.Errorf("checkpoint information not available in db for %v", uid)
}
vrRow := qr.Rows[0]

maxTPS, err := sqltypes.ToInt64(qr.Rows[0][2])
maxTPS, err := sqltypes.ToInt64(vrRow[2])
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse max_tps column: %v", err)
}
maxReplicationLag, err := sqltypes.ToInt64(qr.Rows[0][3])
maxReplicationLag, err := sqltypes.ToInt64(vrRow[3])
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse max_replication_lag column: %v", err)
}
startPos, err := mysql.DecodePosition(vrRow[0].ToString())
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse pos column: %v", err)
}
stopPos, err := mysql.DecodePosition(vrRow[1].ToString())
if err != nil {
return VRSettings{}, fmt.Errorf("failed to parse stop_pos column: %v", err)
}

return VRSettings{
StartPos: qr.Rows[0][0].ToString(),
StopPos: qr.Rows[0][1].ToString(),
StartPos: startPos,
StopPos: stopPos,
MaxTPS: maxTPS,
MaxReplicationLag: maxReplicationLag,
State: qr.Rows[0][4].ToString(),
State: vrRow[4].ToString(),
}, nil
}

Expand All @@ -585,12 +586,12 @@ func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource,
func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64) string {
if txTimestamp != 0 {
return fmt.Sprintf(
"update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v where id=%v",
"update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v, message='' where id=%v",
encodeString(mysql.EncodePosition(pos)), timeUpdated, txTimestamp, uid)
}

return fmt.Sprintf(
"update _vt.vreplication set pos=%v, time_updated=%v where id=%v",
"update _vt.vreplication set pos=%v, time_updated=%v, message='' where id=%v",
encodeString(mysql.EncodePosition(pos)), timeUpdated, uid)
}

Expand Down
4 changes: 2 additions & 2 deletions go/vt/binlog/binlogplayer/binlog_player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func TestCreateVReplicationTables(t *testing.T) {
func TestUpdateVReplicationPos(t *testing.T) {
gtid := mysql.MustParseGTID("MariaDB", "0-1-8283")
want := "update _vt.vreplication " +
"set pos='MariaDB/0-1-8283', time_updated=88822 " +
"set pos='MariaDB/0-1-8283', time_updated=88822, message='' " +
"where id=78522"

got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0)
Expand All @@ -369,7 +369,7 @@ func TestUpdateVReplicationPos(t *testing.T) {
func TestUpdateVReplicationTimestamp(t *testing.T) {
gtid := mysql.MustParseGTID("MariaDB", "0-2-582")
want := "update _vt.vreplication " +
"set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828 " +
"set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828, message='' " +
"where id=78522"

got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828)
Expand Down
Loading