Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions go/mysql/binlog_event_rbr.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,14 +853,26 @@ func CellValue(data []byte, pos int, typ byte, metadata uint16, styp querypb.Typ
max := int((((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0xff))
// Length is encoded in 1 or 2 bytes.
if max > 255 {
// This code path exists due to https://bugs.mysql.com/bug.php?id=37426.
// CHAR types need to allocate 3 bytes per char. So, the length for CHAR(255)
// cannot be represented in 1 byte. This also means that this rule does not
// apply to BINARY data.
l := int(uint64(data[pos]) |
uint64(data[pos+1])<<8)
return sqltypes.MakeTrusted(querypb.Type_VARCHAR,
data[pos+2:pos+2+l]), l + 2, nil
}
l := int(data[pos])
return sqltypes.MakeTrusted(querypb.Type_VARCHAR,
data[pos+1:pos+1+l]), l + 1, nil
mdata := data[pos+1 : pos+1+l]
if sqltypes.IsBinary(styp) {
// Fixed length binaries have to be padded with zeroes
// up to the length of the field. Otherwise, equality checks
// fail against saved data. See https://github.com/vitessio/vitess/issues/3984.
ret := make([]byte, max)
copy(ret, mdata)
return sqltypes.MakeTrusted(querypb.Type_BINARY, ret), l + 1, nil
}
return sqltypes.MakeTrusted(querypb.Type_VARCHAR, mdata), l + 1, nil

case TypeGeometry:
l := 0
Expand Down
34 changes: 20 additions & 14 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func NewBinlogPlayerTables(dbClient DBClient, tablet *topodatapb.Tablet, tables
// If an error is encountered, it updates the vreplication state to "Error".
// If a stop position was specifed, and reached, the state is updated to "Stopped".
func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error {
if err := setVReplicationState(blp.dbClient, blp.uid, BlpRunning, ""); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpRunning, ""); err != nil {
log.Errorf("Error writing Running state: %v", err)
}

Expand All @@ -180,7 +180,7 @@ func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error {
Time: time.Now(),
Message: msg,
})
if err := setVReplicationState(blp.dbClient, blp.uid, BlpError, msg); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpError, msg); err != nil {
log.Errorf("Error writing stop state: %v", err)
}
return err
Expand All @@ -191,7 +191,7 @@ func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error {
// applyEvents returns a recordable status message on termination or an error otherwise.
func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
// Read starting values for vreplication.
pos, stopPos, maxTPS, maxReplicationLag, err := readVRSettings(blp.dbClient, blp.uid)
pos, stopPos, maxTPS, maxReplicationLag, err := ReadVRSettings(blp.dbClient, blp.uid)
if err != nil {
log.Error(err)
return err
Expand Down Expand Up @@ -244,14 +244,14 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
case blp.position.Equal(blp.stopPosition):
msg := fmt.Sprintf("not starting BinlogPlayer, we're already at the desired position %v", blp.stopPosition)
log.Info(msg)
if err := setVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
log.Errorf("Error writing stop state: %v", err)
}
return nil
case blp.position.AtLeast(blp.stopPosition):
msg := fmt.Sprintf("starting point %v greater than stopping point %v", blp.position, blp.stopPosition)
log.Error(msg)
if err := setVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
log.Errorf("Error writing stop state: %v", err)
}
// Don't return an error. Otherwise, it will keep retrying.
Expand Down Expand Up @@ -351,7 +351,7 @@ func (blp *BinlogPlayer) applyEvents(ctx context.Context) error {
if blp.position.AtLeast(blp.stopPosition) {
msg := "Reached stopping position, done playing logs"
log.Info(msg)
if err := setVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
if err := SetVReplicationState(blp.dbClient, blp.uid, BlpStopped, msg); err != nil {
log.Errorf("Error writing stop state: %v", err)
}
return nil
Expand Down Expand Up @@ -447,7 +447,7 @@ func (blp *BinlogPlayer) writeRecoveryPosition(tx *binlogdatapb.BinlogTransactio
}

now := time.Now().Unix()
updateRecovery := updateVReplicationPos(blp.uid, position, now, tx.EventToken.Timestamp)
updateRecovery := GenerateUpdatePos(blp.uid, position, now, tx.EventToken.Timestamp)

qr, err := blp.exec(updateRecovery)
if err != nil {
Expand Down Expand Up @@ -503,18 +503,18 @@ func CreateVReplicationTable() []string {
) ENGINE=InnoDB`}
}

// setVReplicationState updates the state in the _vt.vreplication table.
func setVReplicationState(dbClient DBClient, uid uint32, state, message string) error {
// SetVReplicationState updates the state in the _vt.vreplication table.
func SetVReplicationState(dbClient DBClient, uid uint32, state, message string) error {
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(message), uid)
if _, err := dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
return nil
}

// readVRSettings retrieves the throttler settings for
// ReadVRSettings retrieves the throttler settings for
// vreplication from the checkpoint table.
func readVRSettings(dbClient DBClient, uid uint32) (pos, stopPos string, maxTPS, maxReplicationLag int64, err error) {
func ReadVRSettings(dbClient DBClient, uid uint32) (pos, stopPos string, maxTPS, maxReplicationLag int64, err error) {
query := fmt.Sprintf("select pos, stop_pos, max_tps, max_replication_lag from _vt.vreplication where id=%v", uid)
qr, err := dbClient.ExecuteFetch(query, 1)
if err != nil {
Expand Down Expand Up @@ -554,9 +554,9 @@ func CreateVReplicationStopped(workflow string, source *binlogdatapb.BinlogSourc
encodeString(workflow), encodeString(source.String()), encodeString(position), throttler.MaxRateModuleDisabled, throttler.ReplicationLagModuleDisabled, time.Now().Unix(), BlpStopped)
}

// updateVReplicationPos returns a statement to update a value in the
// GenerateUpdatePos returns a statement to update a value in the
// _vt.vreplication table.
func updateVReplicationPos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64) string {
func GenerateUpdatePos(uid uint32, pos mysql.Position, timeUpdated int64, txTimestamp int64) string {
if txTimestamp != 0 {
return fmt.Sprintf(
"update _vt.vreplication set pos=%v, time_updated=%v, transaction_timestamp=%v where id=%v",
Expand Down Expand Up @@ -601,11 +601,17 @@ func encodeString(in string) string {
}

// ReadVReplicationPos returns a statement to query the gtid for a
// given shard from the _vt.vreplication table.
// given stream from the _vt.vreplication table.
func ReadVReplicationPos(index uint32) string {
return fmt.Sprintf("select pos from _vt.vreplication where id=%v", index)
}

// ReadVReplicationStatus returns a statement to query the status fields for a
// given stream from the _vt.vreplication table.
func ReadVReplicationStatus(index uint32) string {
return fmt.Sprintf("select pos, state, message from _vt.vreplication where id=%v", index)
}

// StatsHistoryRecord is used to store a Message with timestamp
type StatsHistoryRecord struct {
Time time.Time
Expand Down
14 changes: 11 additions & 3 deletions go/vt/binlog/binlogplayer/binlog_player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func TestUpdateVReplicationPos(t *testing.T) {
"set pos='MariaDB/0-1-8283', time_updated=88822 " +
"where id=78522"

got := updateVReplicationPos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0)
got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 0)
if got != want {
t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want)
}
Expand All @@ -367,7 +367,7 @@ func TestUpdateVReplicationTimestamp(t *testing.T) {
"set pos='MariaDB/0-2-582', time_updated=88822, transaction_timestamp=481828 " +
"where id=78522"

got := updateVReplicationPos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828)
got := GenerateUpdatePos(78522, mysql.Position{GTIDSet: gtid.GTIDSet()}, 88822, 481828)
if got != want {
t.Errorf("updateVReplicationPos() = %#v, want %#v", got, want)
}
Expand All @@ -377,6 +377,14 @@ func TestReadVReplicationPos(t *testing.T) {
want := "select pos from _vt.vreplication where id=482821"
got := ReadVReplicationPos(482821)
if got != want {
t.Errorf("ReadVReplicationThrottlerSettings(482821) = %#v, want %#v", got, want)
t.Errorf("ReadVReplicationPos(482821) = %#v, want %#v", got, want)
}
}

func TestReadVReplicationStatus(t *testing.T) {
want := "select pos, state, message from _vt.vreplication where id=482821"
got := ReadVReplicationStatus(482821)
if got != want {
t.Errorf("ReadVReplicationStatus(482821) = %#v, want %#v", got, want)
}
}
10 changes: 10 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,16 @@ func (fmd *FakeMysqlDaemon) GetSchema(dbName string, tables, excludeTables []str
return tmutils.FilterTables(fmd.Schema, tables, excludeTables, includeViews)
}

// GetColumns is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetColumns(dbName, table string) ([]string, error) {
return []string{}, nil
}

// GetPrimaryKeyColumns is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) GetPrimaryKeyColumns(dbName, table string) ([]string, error) {
return []string{}, nil
}

// PreflightSchemaChange is part of the MysqlDaemon interface
func (fmd *FakeMysqlDaemon) PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error) {
if fmd.PreflightSchemaChangeResult == nil {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/mysqlctl/mysql_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type MysqlDaemon interface {

// Schema related methods
GetSchema(dbName string, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error)
GetColumns(dbName, table string) ([]string, error)
GetPrimaryKeyColumns(dbName, table string) ([]string, error)
PreflightSchemaChange(dbName string, changes []string) ([]*tabletmanagerdatapb.SchemaChangeResult, error)
ApplySchemaChange(dbName string, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error)

Expand Down
Loading