Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func AlterVReplicationTable() []string {

// SetVReplicationState updates the state in the _vt.vreplication table.
func SetVReplicationState(dbClient DBClient, uid uint32, state, message string) error {
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(message), uid)
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(MessageTruncate(message)), uid)
if _, err := dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
Expand Down Expand Up @@ -613,14 +613,23 @@ func StartVReplicationUntil(uid uint32, pos string) string {
func StopVReplication(uid uint32, message string) string {
return fmt.Sprintf(
"update _vt.vreplication set state='%v', message=%v where id=%v",
BlpStopped, encodeString(message), uid)
BlpStopped, encodeString(MessageTruncate(message)), uid)
}

// DeleteVReplication returns a statement to delete the replication.
func DeleteVReplication(uid uint32) string {
return fmt.Sprintf("delete from _vt.vreplication where id=%v", uid)
}

// MessageTruncate truncates the message string to a safe length.
func MessageTruncate(msg string) string {
// message length is 1000 bytes.
if len(msg) > 950 {
return msg[:950] + "..."
}
return msg
}

func encodeString(in string) string {
buf := bytes.NewBuffer(nil)
sqltypes.NewVarChar(in).EncodeSQL(buf)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (vr *vreplicator) setMessage(message string) error {
Time: time.Now(),
Message: message,
})
query := fmt.Sprintf("update _vt.vreplication set message=%v where id=%v", encodeString(message), vr.id)
query := fmt.Sprintf("update _vt.vreplication set message=%v where id=%v", encodeString(binlogplayer.MessageTruncate(message)), vr.id)
if _, err := vr.dbClient.Execute(query); err != nil {
return fmt.Errorf("could not set message: %v: %v", query, err)
}
Expand Down