Skip to content

Commit

Permalink
feat: add transaction mark to support OceanBase binlog service
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe committed Jan 31, 2024
1 parent 02acf74 commit 5fa9ef0
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 16 deletions.
19 changes: 12 additions & 7 deletions session/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,6 @@ func (s *session) checkOptions() error {
return fmt.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
}

if s.opt.tranBatch > 1 {
s.ddlDB, _ = gorm.Open("mysql", fmt.Sprintf("%s&autocommit=1", addr))
s.ddlDB.LogMode(false)
}

// 禁用日志记录器,不显示任何日志
db.LogMode(false)

Expand All @@ -513,13 +508,13 @@ func (s *session) checkOptions() error {
if s.inc.BackupHost == "" || s.inc.BackupPort == 0 || s.inc.BackupUser == "" {
return errors.New(s.getErrorMessage(ER_INVALID_BACKUP_HOST_INFO))
}
addr = fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=%s&parseTime=True&loc=Local&autocommit=1",
backupAddr := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=%s&parseTime=True&loc=Local&autocommit=1",
s.inc.BackupUser, s.inc.BackupPassword, s.inc.BackupHost, s.inc.BackupPort,
s.inc.DefaultCharset)
if s.inc.BackupTLS != "" {
addr += "&tls=" + s.inc.BackupTLS
}
backupdb, err := gorm.Open("mysql", addr)
backupdb, err := gorm.Open("mysql", backupAddr)

if err != nil {
return fmt.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
Expand Down Expand Up @@ -552,6 +547,16 @@ func (s *session) checkOptions() error {
s.appendErrorMsg("TiDB暂不支持备份功能.")
}

if s.opt.Backup && s.needTransactionMark() && s.opt.tranBatch <= 1 {
s.opt.tranBatch = 50
log.Infof("enable transaction with batch size %d to backup with transaction mark", 50)
}

if s.opt.tranBatch > 1 {
s.ddlDB, _ = gorm.Open("mysql", fmt.Sprintf("%s&autocommit=1", addr))
s.ddlDB.LogMode(false)
}

return nil
}

Expand Down
43 changes: 37 additions & 6 deletions session/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ func (s *session) getNextBackupRecord() *Record {
// }

// 如果开始位置和结果位置相同,说明无变更(受影响行数为0)
if r.StartFile == r.EndFile && r.StartPosition == r.EndPosition {
if !s.needTransactionMark() && r.StartFile == r.EndFile && r.StartPosition == r.EndPosition {
continue
}

// 当使用事务标记时,不再使用 binlog 偏移量判断是否有变更
if s.needTransactionMark() && r.AffectedRows == 0 {
continue
}

Expand Down Expand Up @@ -282,6 +287,8 @@ func (s *session) parserBinlog(ctx context.Context) {
}
}()

var started bool

for {
e, err := logSync.GetEvent(context.Background())
if err != nil {
Expand All @@ -301,9 +308,21 @@ func (s *session) parserBinlog(ctx context.Context) {
}
}

// 如果还没有到操作的binlog范围,跳过
if currentPosition.Compare(startPosition) == -1 {
continue
if s.needTransactionMark() {
if !started {
txMark := s.toTransactionMark(e)
if txMark != nil && txMark.MarkType == transactionMarkTypeStart && txMark.LogFile == record.StartFile && txMark.LogPosition == record.StartPosition {
started = true
currentThreadID = txMark.ThreadID
log.Infof("found transaction start mark: %+v, binlog parser started", txMark)
}
continue
}
} else {
// 如果还没有到操作的binlog范围,跳过
if currentPosition.Compare(startPosition) == -1 {
continue
}
}

// log.Errorf("binlog pos: %d", int(currentPosition.Pos))
Expand All @@ -327,7 +346,7 @@ func (s *session) parserBinlog(ctx context.Context) {
// log.Error(string(event.Query))
// }

if s.dbType == DBTypeMariaDB && s.dbVersion >= 100000 {
if s.dbType == DBTypeOceanBase || (s.dbType == DBTypeMariaDB && s.dbVersion >= 100000) {
goto ENDCHECK
}

Expand Down Expand Up @@ -385,8 +404,16 @@ func (s *session) parserBinlog(ctx context.Context) {
}

ENDCHECK:
if s.needTransactionMark() {
txMark := s.toTransactionMark(e)
if txMark != nil && txMark.MarkType == transactionMarkTypeEnd && txMark.LogFile == record.StartFile && txMark.LogPosition == record.StartPosition {
log.Infof("found transaction end mark: %+v, binlog parsing finished", txMark)
break
}
}

// 如果操作已超过binlog范围,切换到下一日志
if currentPosition.Compare(stopPosition) > -1 {
if !s.needTransactionMark() && currentPosition.Compare(stopPosition) > -1 {
// sql被kill后,如果备份时可以检测到行,则认为执行成功
// 工单只有执行成功,才允许标记为备份成功
// if (record.StageStatus == StatusExecFail && record.AffectedRows > 0) ||
Expand Down Expand Up @@ -433,6 +460,10 @@ func (s *session) parserBinlog(ctx context.Context) {
s.myRecord = next
record = next
} else {
if s.needTransactionMark() {
continue
}
log.Info("all binlog records processed, exit binlog parser")
break
}
}
Expand Down
90 changes: 90 additions & 0 deletions session/session_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"unicode/utf8"

mysqlDriver "github.com/go-sql-driver/mysql"
"github.com/hanchuanchuan/go-mysql/replication"
"github.com/hanchuanchuan/goInception/ast"
"github.com/hanchuanchuan/goInception/mysql"
"github.com/jinzhu/gorm"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -422,6 +424,7 @@ func (s *session) mysqlCreateBackupTable(record *Record) (
longDataType = cache.longDataType
hostMaxLength = cache.hostMaxLength
}

record.TableInfo.IsCreated = true
return
}
Expand Down Expand Up @@ -493,3 +496,90 @@ func (s *session) checkBackupTableHostMaxLength(dbname string) (length int) {
return 0

}

func (s *session) needTransactionMark() bool {
return s.dbType == DBTypeOceanBase
}

type transactionMarkType int

const (
transactionMarkTypeStart transactionMarkType = iota
transactionMarkTypeEnd
)

type TransactionMarkData struct {
ThreadID uint32
LogFile string
LogPosition int
}

func (s *session) createTransactionMarkTable() {
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", transactionMarkDb)
if s.executeInternal(sql) {
sql = fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s` ("+
"id INT AUTO_INCREMENT PRIMARY KEY, "+
"mark_type INT, "+
"thread_id INT UNSIGNED, "+
"log_file VARCHAR(40), "+
"log_position INT "+
")", transactionMarkDb, transactionMarkTable)
s.executeInternal(sql)
}
}

func (s *session) markTransactionStart(tx *gorm.DB, data *TransactionMarkData) *gorm.DB {
return s.insertTransactionMark(tx, transactionMarkTypeStart, data)
}

func (s *session) markTransactionEnd(tx *gorm.DB, data *TransactionMarkData) *gorm.DB {
return s.insertTransactionMark(tx, transactionMarkTypeEnd, data)
}

func (s *session) insertTransactionMark(tx *gorm.DB, markType transactionMarkType, data *TransactionMarkData) *gorm.DB {
sql := fmt.Sprintf("INSERT INTO `%s`.`%s` "+
"(mark_type, thread_id, log_file, log_position) "+
"VALUES (%d, %d,'%s', %d)", transactionMarkDb, transactionMarkTable,
markType, data.ThreadID, data.LogFile, data.LogPosition)
return tx.Exec(sql)
}

func (s *session) isTransactionMark(dbname string, table string) bool {
return strings.EqualFold(dbname, transactionMarkDb) && strings.EqualFold(table, transactionMarkTable)
}

type transactionMark struct {
ID int32
MarkType transactionMarkType
TransactionMarkData
}

func (s *session) toTransactionMark(e *replication.BinlogEvent) *transactionMark {
if event, ok := e.Event.(*replication.RowsEvent); ok {
if s.isTransactionMark(string(event.Table.Schema), string(event.Table.Table)) {
for _, rows := range event.Rows {
mark := &transactionMark{}
mark.ID = rows[0].(int32)
mark.MarkType = transactionMarkType(rows[1].(int32))
mark.ThreadID = uint32(1<<32 + int64(rows[2].(int32)))
mark.LogFile = rows[3].(string)
mark.LogPosition = int(rows[4].(int32))
return mark
}
}
}
return nil
}

func (s *session) executeInternal(sql string) bool {
if _, err := s.exec(sql, true); err != nil {
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.appendErrorMsg(myErr.Message + " (sql: " + sql + ")")
} else {
s.appendErrorMsg(err.Error())
}
return false
}
return true
}
42 changes: 39 additions & 3 deletions session/session_inception.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
maxKeyLength57 = 3072

remoteBackupTable = "$_$Inception_backup_information$_$"
transactionMarkDb = "$_$Inception$_$"
transactionMarkTable = "transaction_mark"
TABLE_COMMENT_MAXLEN = 2048
COLUMN_COMMENT_MAXLEN = 1024
INDEX_COMMENT_MAXLEN = 1024
Expand Down Expand Up @@ -1080,6 +1082,10 @@ func (s *session) executeTransaction(records []*Record) int {
records = records[0:skipIndex]
}

if s.needTransactionMark() {
s.createTransactionMarkTable()
}

// 开始事务
tx := s.db.Begin()

Expand All @@ -1100,6 +1106,7 @@ func (s *session) executeTransaction(records []*Record) int {
}

currentThreadId := s.fetchTranThreadID(tx)
var txMarkData *TransactionMarkData

for i := range records {
record := records[i]
Expand All @@ -1119,6 +1126,30 @@ func (s *session) executeTransaction(records []*Record) int {
}
record.StartFile = masterStatus.File
record.StartPosition = masterStatus.Position

txMarkData = &TransactionMarkData{
ThreadID: currentThreadId,
LogFile: masterStatus.File,
LogPosition: masterStatus.Position,
}

if s.needTransactionMark() {
res := s.markTransactionStart(tx, txMarkData)
if errs := res.GetErrors(); len(errs) > 0 {
tx.Rollback()
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, errs)
record.StageStatus = StatusExecFail
record.ExecComplete = false
for _, err := range errs {
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.appendErrorMsg(myErr.Message)
} else {
s.appendErrorMsg(err.Error())
}
}
return 2
}
}
}

record.Stage = StageExec
Expand All @@ -1129,8 +1160,13 @@ func (s *session) executeTransaction(records []*Record) int {
record.ExecTime = fmt.Sprintf("%.3f", time.Since(start).Seconds())
record.ExecTimestamp = time.Now().Unix()

if errs := res.GetErrors(); len(errs) > 0 {
tx.Rollback()
errs := res.GetErrors()

if len(errs) == 0 && s.opt.Backup && s.needTransactionMark() && i == len(records)-1 {
errs = s.markTransactionEnd(tx, txMarkData).GetErrors()
}

if len(errs) > 0 {
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, errs)

for j := range records {
Expand Down Expand Up @@ -1180,7 +1216,7 @@ func (s *session) executeTransaction(records []*Record) int {
record.EndPosition = masterStatus.Position

// 开始位置和结束位置一样,无变更
if record.StartFile == record.EndFile &&
if !s.needTransactionMark() && record.StartFile == record.EndFile &&
record.StartPosition == record.EndPosition {

record.StartFile = ""
Expand Down

0 comments on commit 5fa9ef0

Please sign in to comment.