Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add mark to transaction for OceanBase #619

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions session/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,6 @@ func (s *session) checkOptions() error {
return fmt.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
}

if s.opt.tranBatch > 1 {
s.ddlDB, _ = gorm.Open("mysql", fmt.Sprintf("%s&autocommit=1", addr))
s.ddlDB.LogMode(false)
}

// 禁用日志记录器,不显示任何日志
db.LogMode(false)

Expand All @@ -513,13 +508,13 @@ func (s *session) checkOptions() error {
if s.inc.BackupHost == "" || s.inc.BackupPort == 0 || s.inc.BackupUser == "" {
return errors.New(s.getErrorMessage(ER_INVALID_BACKUP_HOST_INFO))
}
addr = fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=%s&parseTime=True&loc=Local&autocommit=1",
backupAddr := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=%s&parseTime=True&loc=Local&autocommit=1",
s.inc.BackupUser, s.inc.BackupPassword, s.inc.BackupHost, s.inc.BackupPort,
s.inc.DefaultCharset)
if s.inc.BackupTLS != "" {
addr += "&tls=" + s.inc.BackupTLS
}
backupdb, err := gorm.Open("mysql", addr)
backupdb, err := gorm.Open("mysql", backupAddr)

if err != nil {
return fmt.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
Expand Down Expand Up @@ -552,6 +547,16 @@ func (s *session) checkOptions() error {
s.appendErrorMsg("TiDB暂不支持备份功能.")
}

if s.opt.Backup && s.needTransactionMark() && s.opt.tranBatch <= 1 {
s.opt.tranBatch = 50
log.Infof("enable transaction with batch size %d to backup with transaction mark", 50)
}

if s.opt.tranBatch > 1 {
s.ddlDB, _ = gorm.Open("mysql", fmt.Sprintf("%s&autocommit=1", addr))
s.ddlDB.LogMode(false)
}

return nil
}

Expand Down
43 changes: 37 additions & 6 deletions session/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ func (s *session) getNextBackupRecord() *Record {
// }

// 如果开始位置和结果位置相同,说明无变更(受影响行数为0)
if r.StartFile == r.EndFile && r.StartPosition == r.EndPosition {
if !s.needTransactionMark() && r.StartFile == r.EndFile && r.StartPosition == r.EndPosition {
continue
}

// 当使用事务标记时,不再使用 binlog 偏移量判断是否有变更
if s.needTransactionMark() && r.AffectedRows == 0 {
continue
}

Expand Down Expand Up @@ -282,6 +287,8 @@ func (s *session) parserBinlog(ctx context.Context) {
}
}()

var started bool

for {
e, err := logSync.GetEvent(context.Background())
if err != nil {
Expand All @@ -301,9 +308,21 @@ func (s *session) parserBinlog(ctx context.Context) {
}
}

// 如果还没有到操作的binlog范围,跳过
if currentPosition.Compare(startPosition) == -1 {
continue
if s.needTransactionMark() {
if !started {
txMark := s.toTransactionMark(e)
if txMark != nil && txMark.MarkType == transactionMarkTypeStart && txMark.LogFile == record.StartFile && txMark.LogPosition == record.StartPosition {
started = true
currentThreadID = txMark.ThreadID
log.Infof("found transaction start mark: %+v, binlog parser started", txMark)
}
continue
}
} else {
// 如果还没有到操作的binlog范围,跳过
if currentPosition.Compare(startPosition) == -1 {
continue
}
}

// log.Errorf("binlog pos: %d", int(currentPosition.Pos))
Expand All @@ -327,7 +346,7 @@ func (s *session) parserBinlog(ctx context.Context) {
// log.Error(string(event.Query))
// }

if s.dbType == DBTypeMariaDB && s.dbVersion >= 100000 {
if s.dbType == DBTypeOceanBase || (s.dbType == DBTypeMariaDB && s.dbVersion >= 100000) {
goto ENDCHECK
}

Expand Down Expand Up @@ -385,8 +404,16 @@ func (s *session) parserBinlog(ctx context.Context) {
}

ENDCHECK:
if s.needTransactionMark() {
txMark := s.toTransactionMark(e)
if txMark != nil && txMark.MarkType == transactionMarkTypeEnd && txMark.LogFile == record.StartFile && txMark.LogPosition == record.StartPosition {
log.Infof("found transaction end mark: %+v, binlog parsing finished", txMark)
break
}
}

// 如果操作已超过binlog范围,切换到下一日志
if currentPosition.Compare(stopPosition) > -1 {
if !s.needTransactionMark() && currentPosition.Compare(stopPosition) > -1 {
// sql被kill后,如果备份时可以检测到行,则认为执行成功
// 工单只有执行成功,才允许标记为备份成功
// if (record.StageStatus == StatusExecFail && record.AffectedRows > 0) ||
Expand Down Expand Up @@ -433,6 +460,10 @@ func (s *session) parserBinlog(ctx context.Context) {
s.myRecord = next
record = next
} else {
if s.needTransactionMark() {
continue
}
log.Info("all binlog records processed, exit binlog parser")
break
}
}
Expand Down
89 changes: 89 additions & 0 deletions session/session_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"sync"
"unicode/utf8"

"github.com/go-mysql-org/go-mysql/replication"
mysqlDriver "github.com/go-sql-driver/mysql"
"github.com/hanchuanchuan/goInception/ast"
"github.com/hanchuanchuan/goInception/mysql"
"github.com/jinzhu/gorm"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -493,3 +495,90 @@ func (s *session) checkBackupTableHostMaxLength(dbname string) (length int) {
return 0

}

func (s *session) needTransactionMark() bool {
return s.dbType == DBTypeOceanBase
}

type transactionMarkType int

const (
transactionMarkTypeStart transactionMarkType = iota
transactionMarkTypeEnd
)

type TransactionMarkData struct {
ThreadID uint32
LogFile string
LogPosition int
}

func (s *session) createTransactionMarkTable() {
sql := fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", transactionMarkDb)
if s.executeInternal(sql) {
sql = fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s`.`%s` ("+
"id INT AUTO_INCREMENT PRIMARY KEY, "+
"mark_type INT, "+
"thread_id INT UNSIGNED, "+
"log_file VARCHAR(40), "+
"log_position INT "+
")", transactionMarkDb, transactionMarkTable)
s.executeInternal(sql)
}
}

func (s *session) markTransactionStart(tx *gorm.DB, data *TransactionMarkData) *gorm.DB {
return s.insertTransactionMark(tx, transactionMarkTypeStart, data)
}

func (s *session) markTransactionEnd(tx *gorm.DB, data *TransactionMarkData) *gorm.DB {
return s.insertTransactionMark(tx, transactionMarkTypeEnd, data)
}

func (s *session) insertTransactionMark(tx *gorm.DB, markType transactionMarkType, data *TransactionMarkData) *gorm.DB {
sql := fmt.Sprintf("INSERT INTO `%s`.`%s` "+
"(mark_type, thread_id, log_file, log_position) "+
"VALUES (%d, %d,'%s', %d)", transactionMarkDb, transactionMarkTable,
markType, data.ThreadID, data.LogFile, data.LogPosition)
return tx.Exec(sql)
}

func (s *session) isTransactionMark(dbname string, table string) bool {
return strings.EqualFold(dbname, transactionMarkDb) && strings.EqualFold(table, transactionMarkTable)
}

type transactionMark struct {
ID int32
MarkType transactionMarkType
TransactionMarkData
}

func (s *session) toTransactionMark(e *replication.BinlogEvent) *transactionMark {
if event, ok := e.Event.(*replication.RowsEvent); ok {
if s.isTransactionMark(string(event.Table.Schema), string(event.Table.Table)) {
for _, rows := range event.Rows {
mark := &transactionMark{}
mark.ID = rows[0].(int32)
mark.MarkType = transactionMarkType(rows[1].(int32))
mark.ThreadID = uint32(1<<32 + int64(rows[2].(int32)))
mark.LogFile = rows[3].(string)
mark.LogPosition = int(rows[4].(int32))
return mark
}
}
}
return nil
}

func (s *session) executeInternal(sql string) bool {
if _, err := s.exec(sql, true); err != nil {
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, err)
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.appendErrorMsg(myErr.Message + " (sql: " + sql + ")")
} else {
s.appendErrorMsg(err.Error())
}
return false
}
return true
}
42 changes: 39 additions & 3 deletions session/session_inception.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
maxKeyLength57 = 3072

remoteBackupTable = "$_$Inception_backup_information$_$"
transactionMarkDb = "$_$Inception$_$"
transactionMarkTable = "transaction_mark"
TABLE_COMMENT_MAXLEN = 2048
COLUMN_COMMENT_MAXLEN = 1024
INDEX_COMMENT_MAXLEN = 1024
Expand Down Expand Up @@ -1123,6 +1125,10 @@ func (s *session) executeTransaction(records []*Record) int {
records = records[0:skipIndex]
}

if s.needTransactionMark() {
s.createTransactionMarkTable()
}

// 开始事务
tx := s.db.Begin()

Expand All @@ -1143,6 +1149,7 @@ func (s *session) executeTransaction(records []*Record) int {
}

currentThreadId := s.fetchTranThreadID(tx)
var txMarkData *TransactionMarkData

for i := range records {
record := records[i]
Expand All @@ -1162,6 +1169,30 @@ func (s *session) executeTransaction(records []*Record) int {
}
record.StartFile = masterStatus.File
record.StartPosition = masterStatus.Position

txMarkData = &TransactionMarkData{
ThreadID: currentThreadId,
LogFile: masterStatus.File,
LogPosition: masterStatus.Position,
}

if s.needTransactionMark() {
res := s.markTransactionStart(tx, txMarkData)
if errs := res.GetErrors(); len(errs) > 0 {
tx.Rollback()
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, errs)
record.StageStatus = StatusExecFail
record.ExecComplete = false
for _, err := range errs {
if myErr, ok := err.(*mysqlDriver.MySQLError); ok {
s.appendErrorMsg(myErr.Message)
} else {
s.appendErrorMsg(err.Error())
}
}
return 2
}
}
}

record.Stage = StageExec
Expand All @@ -1172,8 +1203,13 @@ func (s *session) executeTransaction(records []*Record) int {
record.ExecTime = fmt.Sprintf("%.3f", time.Since(start).Seconds())
record.ExecTimestamp = time.Now().Unix()

if errs := res.GetErrors(); len(errs) > 0 {
tx.Rollback()
errs := res.GetErrors()

if len(errs) == 0 && s.opt.Backup && s.needTransactionMark() && i == len(records)-1 {
errs = s.markTransactionEnd(tx, txMarkData).GetErrors()
}

if len(errs) > 0 {
log.Errorf("con:%d %v", s.sessionVars.ConnectionID, errs)

for j := range records {
Expand Down Expand Up @@ -1223,7 +1259,7 @@ func (s *session) executeTransaction(records []*Record) int {
record.EndPosition = masterStatus.Position

// 开始位置和结束位置一样,无变更
if record.StartFile == record.EndFile &&
if !s.needTransactionMark() && record.StartFile == record.EndFile &&
record.StartPosition == record.EndPosition {

record.StartFile = ""
Expand Down
Loading