Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions go/base/checksum_comparison.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2015 Shlomi Noach, courtesy Booking.com
See https://github.com/github/gh-ost/blob/master/LICENSE
*/

package base

import (
"fmt"

"github.com/github/gh-ost/go/sql"
)

type ChecksumFunc func() (checksum string, err error)

// BinlogCoordinates described binary log coordinates in the form of log file & log position.
type ChecksumComparison struct {
Iteration int64
OriginalTableChecksumFunc ChecksumFunc
GhostTableChecksumFunc ChecksumFunc
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
Attempts int
}

func NewChecksumComparison(
iteration int64,
originalTableChecksumFunc, ghostTableChecksumFunc ChecksumFunc,
rangeMinValues, rangeMaxValues *sql.ColumnValues,
) *ChecksumComparison {
return &ChecksumComparison{
Iteration: iteration,
OriginalTableChecksumFunc: originalTableChecksumFunc,
GhostTableChecksumFunc: ghostTableChecksumFunc,
MigrationIterationRangeMinValues: rangeMinValues,
MigrationIterationRangeMaxValues: rangeMaxValues,
Attempts: 0,
}
}

func (this *ChecksumComparison) IncrementAttempts() {
this.Attempts = this.Attempts + 1
}

func (this *ChecksumComparison) String() string {
return fmt.Sprintf("iteration: %d, range: [%s]..[%s], attempts: %d",
this.Iteration, this.MigrationIterationRangeMinValues, this.MigrationIterationRangeMaxValues, this.Attempts,
)
}
5 changes: 5 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type MigrationContext struct {

CountTableRows bool
ConcurrentCountTableRows bool
ChecksumData bool
AllowedRunningOnMaster bool
AllowedMasterMaster bool
SwitchToRowBinlogFormat bool
Expand Down Expand Up @@ -179,6 +180,9 @@ type MigrationContext struct {
pointOfInterestTimeMutex *sync.Mutex
CurrentLag int64
currentProgress uint64
PendingChecksumComparisons int64
SuccessfulChecksumComparisons int64
SubmittedChecksumComparisons int64
ThrottleHTTPStatusCode int64
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
Expand Down Expand Up @@ -207,6 +211,7 @@ type MigrationContext struct {
GhostTableVirtualColumns *sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
GhostUniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList
ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func main() {
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy")
flag.BoolVar(&migrationContext.ChecksumData, "checksum-data", false, "if true, checksum original and ghost table shared data on the fly, fail migration if checksum mismatches. Checksum queries run on applier node (the master unless testing on replica)")
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
flag.BoolVar(&migrationContext.AllowedMasterMaster, "allow-master-master", false, "explicitly allow running in a master-master setup")
flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!")
Expand Down
54 changes: 45 additions & 9 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

const (
atomicCutOverMagicHint = "ghost-cut-over-sentry"
groupConcatMaxLength = 1024 * 1024
)

type dmlBuildResult struct {
Expand Down Expand Up @@ -68,7 +69,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {

func (this *Applier) InitDBConnections() (err error) {

applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName, fmt.Sprintf("group_concat_max_len=%d", groupConcatMaxLength))
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
return err
}
Expand Down Expand Up @@ -454,25 +455,25 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo

// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
// data actually gets copied from original table.
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, checksumComparison *base.ChecksumComparison, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)

query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.SharedColumns.Names(),
this.migrationContext.MappedSharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.UniqueKey,
this.migrationContext.GhostUniqueKey,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(),
)
if err != nil {
return chunkSize, rowsAffected, duration, err
return chunkSize, rowsAffected, duration, checksumComparison, err
}

sqlResult, err := func() (gosql.Result, error) {
Expand All @@ -491,7 +492,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}
result, err := tx.Exec(query, explodedArgs...)
result, err := tx.Exec(insertQuery, explodedArgs...)
if err != nil {
return nil, err
}
Expand All @@ -502,7 +503,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
}()

if err != nil {
return chunkSize, rowsAffected, duration, err
return chunkSize, rowsAffected, duration, checksumComparison, err
}
rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Since(startTime)
Expand All @@ -512,7 +513,38 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.MigrationIterationRangeMaxValues,
this.migrationContext.GetIteration(),
chunkSize)
return chunkSize, rowsAffected, duration, nil

var originalTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) {
err = this.db.QueryRow(originalChecksumQuery, explodedArgs...).Scan(&checksum)
return checksum, err
}
var ghostTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) {
err = this.db.QueryRow(ghostChecksumQuery, explodedArgs...).Scan(&checksum)
return checksum, err
}
checksumComparison = base.NewChecksumComparison(
this.migrationContext.GetIteration(),
originalTableChecksumFunc, ghostTableChecksumFunc,
this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues,
)

return chunkSize, rowsAffected, duration, checksumComparison, nil
}

func (this *Applier) CompareChecksum(checksumComparison *base.ChecksumComparison) error {
originalChecksum, err := checksumComparison.OriginalTableChecksumFunc()
if err != nil {
return err
}
ghostChecksum, err := checksumComparison.GhostTableChecksumFunc()
if err != nil {
return err
}
if originalChecksum != ghostChecksum {
return fmt.Errorf("Checksum failure. Iteration: %d", checksumComparison.Iteration)
}
return nil
}

// LockOriginalTable places a write lock on the original table
Expand Down Expand Up @@ -811,6 +843,10 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
}

tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
if this.migrationContext.ChecksumData {
// Allow extra time for checksum to evaluate
tableLockTimeoutSeconds += this.migrationContext.CutOverLockTimeoutSeconds
}
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
Expand Down
13 changes: 8 additions & 5 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
if err != nil {
return err
}
sharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys)
sharedUniqueKeys, ghostSharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys)
if err != nil {
return err
}
Expand All @@ -150,13 +150,15 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
}
if uniqueKeyIsValid {
this.migrationContext.UniqueKey = sharedUniqueKeys[i]
this.migrationContext.GhostUniqueKey = ghostSharedUniqueKeys[i]
break
}
}
if this.migrationContext.UniqueKey == nil {
return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out")
}
this.migrationContext.Log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name)
this.migrationContext.Log.Infof("Chosen shared unique key is %s. ghost unique key is %s", this.migrationContext.UniqueKey.Name, this.migrationContext.GhostUniqueKey.Name)
this.migrationContext.Log.Infof("Chosen shared unique key columns %+v. ghost unique key columns: %+v", this.migrationContext.UniqueKey.Columns.Names(), this.migrationContext.GhostUniqueKey.Columns.Names())
if this.migrationContext.UniqueKey.HasNullable {
if this.migrationContext.NullableUniqueKeyAllowed {
this.migrationContext.Log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey)
Expand Down Expand Up @@ -668,17 +670,18 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*

// getSharedUniqueKeys returns the intersection of two given unique keys,
// testing by list of columns
func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (uniqueKeys [](*sql.UniqueKey), err error) {
func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (sharedUniqueKeys [](*sql.UniqueKey), ghostSharedUniqueKeys [](*sql.UniqueKey), err error) {
// We actually do NOT rely on key name, just on the set of columns. This is because maybe
// the ALTER is on the name itself...
for _, originalUniqueKey := range originalUniqueKeys {
for _, ghostUniqueKey := range ghostUniqueKeys {
if originalUniqueKey.Columns.EqualsByNames(&ghostUniqueKey.Columns) {
uniqueKeys = append(uniqueKeys, originalUniqueKey)
sharedUniqueKeys = append(sharedUniqueKeys, originalUniqueKey)
ghostSharedUniqueKeys = append(ghostSharedUniqueKeys, ghostUniqueKey)
}
}
}
return uniqueKeys, nil
return sharedUniqueKeys, ghostSharedUniqueKeys, nil
}

// getSharedColumns returns the intersection of two lists of columns in same order as the first list
Expand Down
Loading