diff --git a/go/base/checksum_comparison.go b/go/base/checksum_comparison.go new file mode 100644 index 000000000..adfe26c86 --- /dev/null +++ b/go/base/checksum_comparison.go @@ -0,0 +1,49 @@ +/* + Copyright 2015 Shlomi Noach, courtesy Booking.com + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package base + +import ( + "fmt" + + "github.com/github/gh-ost/go/sql" +) + +type ChecksumFunc func() (checksum string, err error) + +// BinlogCoordinates described binary log coordinates in the form of log file & log position. +type ChecksumComparison struct { + Iteration int64 + OriginalTableChecksumFunc ChecksumFunc + GhostTableChecksumFunc ChecksumFunc + MigrationIterationRangeMinValues *sql.ColumnValues + MigrationIterationRangeMaxValues *sql.ColumnValues + Attempts int +} + +func NewChecksumComparison( + iteration int64, + originalTableChecksumFunc, ghostTableChecksumFunc ChecksumFunc, + rangeMinValues, rangeMaxValues *sql.ColumnValues, +) *ChecksumComparison { + return &ChecksumComparison{ + Iteration: iteration, + OriginalTableChecksumFunc: originalTableChecksumFunc, + GhostTableChecksumFunc: ghostTableChecksumFunc, + MigrationIterationRangeMinValues: rangeMinValues, + MigrationIterationRangeMaxValues: rangeMaxValues, + Attempts: 0, + } +} + +func (this *ChecksumComparison) IncrementAttempts() { + this.Attempts = this.Attempts + 1 +} + +func (this *ChecksumComparison) String() string { + return fmt.Sprintf("iteration: %d, range: [%s]..[%s], attempts: %d", + this.Iteration, this.MigrationIterationRangeMinValues, this.MigrationIterationRangeMaxValues, this.Attempts, + ) +} diff --git a/go/base/context.go b/go/base/context.go index cee66efe7..533717896 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -84,6 +84,7 @@ type MigrationContext struct { CountTableRows bool ConcurrentCountTableRows bool + ChecksumData bool AllowedRunningOnMaster bool AllowedMasterMaster bool SwitchToRowBinlogFormat bool @@ -179,6 +180,9 @@ type MigrationContext struct { pointOfInterestTimeMutex *sync.Mutex CurrentLag int64 currentProgress uint64 + PendingChecksumComparisons int64 + SuccessfulChecksumComparisons int64 + SubmittedChecksumComparisons int64 ThrottleHTTPStatusCode int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 @@ -207,6 +211,7 @@ type MigrationContext struct { GhostTableVirtualColumns *sql.ColumnList GhostTableUniqueKeys [](*sql.UniqueKey) UniqueKey *sql.UniqueKey + GhostUniqueKey *sql.UniqueKey SharedColumns *sql.ColumnList ColumnRenameMap map[string]string DroppedColumnsMap map[string]bool diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index d287d1176..9e408cf29 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -67,6 +67,7 @@ func main() { flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy") + flag.BoolVar(&migrationContext.ChecksumData, "checksum-data", false, "if true, checksum original and ghost table shared data on the fly, fail migration if checksum mismatches. Checksum queries run on applier node (the master unless testing on replica)") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") flag.BoolVar(&migrationContext.AllowedMasterMaster, "allow-master-master", false, "explicitly allow running in a master-master setup") flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!") diff --git a/go/logic/applier.go b/go/logic/applier.go index 3b1b9bf06..3b1d9b61f 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -21,6 +21,7 @@ import ( const ( atomicCutOverMagicHint = "ghost-cut-over-sentry" + groupConcatMaxLength = 1024 * 1024 ) type dmlBuildResult struct { @@ -68,7 +69,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { func (this *Applier) InitDBConnections() (err error) { - applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) + applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName, fmt.Sprintf("group_concat_max_len=%d", groupConcatMaxLength)) if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil { return err } @@ -454,25 +455,25 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo // ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where // data actually gets copied from original table. -func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { +func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, checksumComparison *base.ChecksumComparison, err error) { startTime := time.Now() chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize) - query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), - this.migrationContext.UniqueKey.Name, - &this.migrationContext.UniqueKey.Columns, + this.migrationContext.UniqueKey, + this.migrationContext.GhostUniqueKey, this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.GetIteration() == 0, this.migrationContext.IsTransactionalTable(), ) if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, checksumComparison, err } sqlResult, err := func() (gosql.Result, error) { @@ -491,7 +492,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected if _, err := tx.Exec(sessionQuery); err != nil { return nil, err } - result, err := tx.Exec(query, explodedArgs...) + result, err := tx.Exec(insertQuery, explodedArgs...) if err != nil { return nil, err } @@ -502,7 +503,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected }() if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, checksumComparison, err } rowsAffected, _ = sqlResult.RowsAffected() duration = time.Since(startTime) @@ -512,7 +513,38 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.MigrationIterationRangeMaxValues, this.migrationContext.GetIteration(), chunkSize) - return chunkSize, rowsAffected, duration, nil + + var originalTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) { + err = this.db.QueryRow(originalChecksumQuery, explodedArgs...).Scan(&checksum) + return checksum, err + } + var ghostTableChecksumFunc base.ChecksumFunc = func() (checksum string, err error) { + err = this.db.QueryRow(ghostChecksumQuery, explodedArgs...).Scan(&checksum) + return checksum, err + } + checksumComparison = base.NewChecksumComparison( + this.migrationContext.GetIteration(), + originalTableChecksumFunc, ghostTableChecksumFunc, + this.migrationContext.MigrationIterationRangeMinValues, + this.migrationContext.MigrationIterationRangeMaxValues, + ) + + return chunkSize, rowsAffected, duration, checksumComparison, nil +} + +func (this *Applier) CompareChecksum(checksumComparison *base.ChecksumComparison) error { + originalChecksum, err := checksumComparison.OriginalTableChecksumFunc() + if err != nil { + return err + } + ghostChecksum, err := checksumComparison.GhostTableChecksumFunc() + if err != nil { + return err + } + if originalChecksum != ghostChecksum { + return fmt.Errorf("Checksum failure. Iteration: %d", checksumComparison.Iteration) + } + return nil } // LockOriginalTable places a write lock on the original table @@ -811,6 +843,10 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke } tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2 + if this.migrationContext.ChecksumData { + // Allow extra time for checksum to evaluate + tableLockTimeoutSeconds += this.migrationContext.CutOverLockTimeoutSeconds + } this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds) query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds) if _, err := tx.Exec(query); err != nil { diff --git a/go/logic/inspect.go b/go/logic/inspect.go index bc1083061..e7beeff1e 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -125,7 +125,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { if err != nil { return err } - sharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys) + sharedUniqueKeys, ghostSharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys) if err != nil { return err } @@ -150,13 +150,15 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { } if uniqueKeyIsValid { this.migrationContext.UniqueKey = sharedUniqueKeys[i] + this.migrationContext.GhostUniqueKey = ghostSharedUniqueKeys[i] break } } if this.migrationContext.UniqueKey == nil { return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out") } - this.migrationContext.Log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name) + this.migrationContext.Log.Infof("Chosen shared unique key is %s. ghost unique key is %s", this.migrationContext.UniqueKey.Name, this.migrationContext.GhostUniqueKey.Name) + this.migrationContext.Log.Infof("Chosen shared unique key columns %+v. ghost unique key columns: %+v", this.migrationContext.UniqueKey.Columns.Names(), this.migrationContext.GhostUniqueKey.Columns.Names()) if this.migrationContext.UniqueKey.HasNullable { if this.migrationContext.NullableUniqueKeyAllowed { this.migrationContext.Log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey) @@ -668,17 +670,18 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](* // getSharedUniqueKeys returns the intersection of two given unique keys, // testing by list of columns -func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (uniqueKeys [](*sql.UniqueKey), err error) { +func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (sharedUniqueKeys [](*sql.UniqueKey), ghostSharedUniqueKeys [](*sql.UniqueKey), err error) { // We actually do NOT rely on key name, just on the set of columns. This is because maybe // the ALTER is on the name itself... for _, originalUniqueKey := range originalUniqueKeys { for _, ghostUniqueKey := range ghostUniqueKeys { if originalUniqueKey.Columns.EqualsByNames(&ghostUniqueKey.Columns) { - uniqueKeys = append(uniqueKeys, originalUniqueKey) + sharedUniqueKeys = append(sharedUniqueKeys, originalUniqueKey) + ghostSharedUniqueKeys = append(ghostSharedUniqueKeys, ghostUniqueKey) } } } - return uniqueKeys, nil + return sharedUniqueKeys, ghostSharedUniqueKeys, nil } // getSharedColumns returns the intersection of two lists of columns in same order as the first list diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 70af08db9..943ffb034 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -11,6 +11,7 @@ import ( "math" "os" "strings" + "sync" "sync/atomic" "time" @@ -23,8 +24,9 @@ import ( type ChangelogState string const ( - GhostTableMigrated ChangelogState = "GhostTableMigrated" - AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + GhostTableMigrated ChangelogState = "GhostTableMigrated" + AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + checksumComparisonQueueBuffer = 10 ) func ReadChangelogState(s string) ChangelogState { @@ -80,6 +82,10 @@ type Migrator struct { copyRowsQueue chan tableWriteFunc applyEventsQueue chan *applyEventStruct + rowChecksumCompleteQueue chan bool + checksumComparisonQueue chan *base.ChecksumComparison + checksumComparisonMap map[int64]*base.ChecksumComparison + handledChangelogStates map[string]bool finishedMigrating int64 @@ -94,8 +100,13 @@ func NewMigrator(context *base.MigrationContext) *Migrator { rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), - copyRowsQueue: make(chan tableWriteFunc), - applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + copyRowsQueue: make(chan tableWriteFunc), + applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), + + rowChecksumCompleteQueue: make(chan bool), + checksumComparisonQueue: make(chan *base.ChecksumComparison, checksumComparisonQueueBuffer), + checksumComparisonMap: make(map[int64]*base.ChecksumComparison), + handledChangelogStates: make(map[string]bool), finishedMigrating: 0, } @@ -202,6 +213,10 @@ func (this *Migrator) consumeRowCopyComplete() { }() } +func (this *Migrator) consumeChecksumComparisonsComplete() { + <-this.rowChecksumCompleteQueue +} + func (this *Migrator) canStopStreaming() bool { return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0 } @@ -250,6 +265,40 @@ func (this *Migrator) listenOnPanicAbort() { this.migrationContext.Log.Fatale(err) } +func (this *Migrator) processChecksumComparisons() { + var completeOnce sync.Once + for { + func() { + // Avoid blocking. Only pull from the queue the available events + for { + select { + case checksumComparison := <-this.checksumComparisonQueue: + this.migrationContext.Log.Debugf("new checksums!!!!!!!!! %+v", checksumComparison) + this.checksumComparisonMap[checksumComparison.Iteration] = checksumComparison + default: + return + } + } + }() + // Iterate the pending checksums. Some of these have been pulled from the queue just above; + // others may be subsuccessful checksums from previous iterations + atomic.StoreInt64(&this.migrationContext.PendingChecksumComparisons, int64(len(this.checksumComparisonMap))) + for iteration, checksumComparison := range this.checksumComparisonMap { + if err := this.applier.CompareChecksum(checksumComparison); err != nil { + checksumComparison.IncrementAttempts() + } else { + atomic.AddInt64(&this.migrationContext.SuccessfulChecksumComparisons, 1) + delete(this.checksumComparisonMap, iteration) + } + } + atomic.StoreInt64(&this.migrationContext.PendingChecksumComparisons, int64(len(this.checksumComparisonMap))) + if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { + go completeOnce.Do(func() { this.rowChecksumCompleteQueue <- true }) + } + time.Sleep(250 * time.Millisecond) + } +} + // validateStatement validates the `alter` statement meets criteria. // At this time this means: // - column renames are approved @@ -388,6 +437,7 @@ func (this *Migrator) Migrate() (err error) { } go this.executeWriteFuncs() go this.iterateChunks() + go this.processChecksumComparisons() this.migrationContext.MarkRowCopyStartTime() go this.initiateStatus() @@ -397,6 +447,11 @@ func (this *Migrator) Migrate() (err error) { if err := this.hooksExecutor.onRowCopyComplete(); err != nil { return err } + if this.migrationContext.ChecksumData { + this.migrationContext.Log.Debugf("Operating until checksum comparison iteration is complete") + this.consumeChecksumComparisonsComplete() + this.migrationContext.Log.Infof("+ checksum comparison iteration compelete") + } this.printStatus(ForcePrintStatusRule) if err := this.hooksExecutor.onBeforeCutOver(); err != nil { @@ -531,6 +586,25 @@ func (this *Migrator) cutOver() (err error) { return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } +func (this *Migrator) waitForChecksumToClear() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) + for { + select { + case <-timeout.C: + { + return this.migrationContext.Log.Errorf("Timeout while waiting for checksums to clear. There are still checksum mismatches") + } + default: + { + if len(this.checksumComparisonMap) == 0 { + return nil + } + time.Sleep(250 * time.Millisecond) + } + } + } +} + // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { @@ -587,6 +661,9 @@ func (this *Migrator) cutOverTwoStep() (err error) { if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { return err } + if err := this.retryOperation(this.waitForChecksumToClear); err != nil { + return err + } if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil { return err } @@ -631,7 +708,9 @@ func (this *Migrator) atomicCutOver() (err error) { if err := this.waitForEventsUpToLock(); err != nil { return this.migrationContext.Log.Errore(err) } - + if err := this.waitForChecksumToClear(); err != nil { + return this.migrationContext.Log.Errore(err) + } // Step 2 // We now attempt an atomic RENAME on original & ghost tables, and expect it to block. this.migrationContext.RenameTablesStartTime = time.Now() @@ -958,10 +1037,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Checksums: %d/%d,%d, Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), + atomic.LoadInt64(&this.migrationContext.SuccessfulChecksumComparisons), atomic.LoadInt64(&this.migrationContext.SubmittedChecksumComparisons), atomic.LoadInt64(&this.migrationContext.PendingChecksumComparisons), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, this.migrationContext.GetCurrentLagDuration().Seconds(), @@ -1130,10 +1210,16 @@ func (this *Migrator) iterateChunks() error { // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. return nil } - _, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() + _, rowsAffected, _, checksumComparison, err := this.applier.ApplyIterationInsertQuery() if err != nil { return err // wrapping call will retry } + + if this.migrationContext.ChecksumData { + this.migrationContext.Log.Debugf("adding checksum") + atomic.AddInt64(&this.migrationContext.SubmittedChecksumComparisons, 1) + this.checksumComparisonQueue <- checksumComparison + } atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) atomic.AddInt64(&this.migrationContext.Iteration, 1) return nil diff --git a/go/mysql/connection.go b/go/mysql/connection.go index 654998cc2..bd809963e 100644 --- a/go/mysql/connection.go +++ b/go/mysql/connection.go @@ -12,6 +12,7 @@ import ( "fmt" "io/ioutil" "net" + "strings" "github.com/go-sql-driver/mysql" ) @@ -102,7 +103,7 @@ func (this *ConnectionConfig) TLSConfig() *tls.Config { return this.tlsConfig } -func (this *ConnectionConfig) GetDBUri(databaseName string) string { +func (this *ConnectionConfig) GetDBUri(databaseName string, extraOptions ...string) string { hostname := this.Key.Hostname var ip = net.ParseIP(hostname) if (ip != nil) && (ip.To4() == nil) { @@ -116,5 +117,11 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string { if this.tlsConfig != nil { tlsOption = TLS_CONFIG_KEY } - return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=%t&autocommit=true&charset=utf8mb4,utf8,latin1&tls=%s", this.User, this.Password, hostname, this.Key.Port, databaseName, interpolateParams, tlsOption) + extraOptionsParams := "" + if len(extraOptions) > 0 { + extraOptionsParams = fmt.Sprintf("&%s", strings.Join(extraOptions, "&")) + } + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=%t&autocommit=true&charset=utf8mb4,utf8,latin1&tls=%s%s", + this.User, this.Password, hostname, this.Key.Port, databaseName, interpolateParams, tlsOption, extraOptionsParams, + ) } diff --git a/go/sql/builder.go b/go/sql/builder.go index 2c5a7ae28..d4a05d954 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -178,60 +178,135 @@ func BuildRangePreparedComparison(columns *ColumnList, args []interface{}, compa return BuildRangeComparison(columns.Names(), values, args, comparisonSign) } -func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { +func BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName string, + sharedColumns []string, mappedSharedColumns []string, + uniqueKey *UniqueKey, + ghostUniqueKey *UniqueKey, + rangeStartValues, rangeEndValues []string, + rangeStartArgs, rangeEndArgs []interface{}, + includeRangeStartValues bool, transactionalTable bool, +) ( + insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error, +) { if len(sharedColumns) == 0 { - return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") + return "", "", "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") } databaseName = EscapeName(databaseName) originalTableName = EscapeName(originalTableName) ghostTableName = EscapeName(ghostTableName) - mappedSharedColumns = duplicateNames(mappedSharedColumns) - for i := range mappedSharedColumns { - mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i]) - } - mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ") - sharedColumns = duplicateNames(sharedColumns) for i := range sharedColumns { sharedColumns[i] = EscapeName(sharedColumns[i]) } sharedColumnsListing := strings.Join(sharedColumns, ", ") - uniqueKey = EscapeName(uniqueKey) + mappedSharedColumns = duplicateNames(mappedSharedColumns) + for i := range mappedSharedColumns { + mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i]) + } + mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ") + + uniqueKeyName := EscapeName(uniqueKey.Name) + ghostUniqueKeyName := EscapeName(ghostUniqueKey.Name) var minRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign if includeRangeStartValues { minRangeComparisonSign = GreaterThanOrEqualsComparisonSign } - rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign) + rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKey.Columns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign) if err != nil { - return "", explodedArgs, err + return "", "", "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) - rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign) + rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKey.Columns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign) if err != nil { - return "", explodedArgs, err + return "", "", "", explodedArgs, err } explodedArgs = append(explodedArgs, rangeExplodedArgs...) transactionalClause := "" if transactionalTable { transactionalClause = "lock in share mode" } - result = fmt.Sprintf(` + insertQuery = fmt.Sprintf(` insert /* gh-ost %s.%s */ ignore into %s.%s (%s) (select %s from %s.%s force index (%s) where (%s and %s) %s ) `, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing, - sharedColumnsListing, databaseName, originalTableName, uniqueKey, + sharedColumnsListing, databaseName, originalTableName, uniqueKeyName, rangeStartComparison, rangeEndComparison, transactionalClause) - return result, explodedArgs, nil + + // Now for checksum comparisons + + sharedColumns = duplicateNames(sharedColumns) // already escaped + for i := range sharedColumns { + sharedColumns[i] = fmt.Sprintf(`IFNULL(%s, 'NULL')`, sharedColumns[i]) + } + sharedColumnsListing = strings.Join(sharedColumns, ", ") + + mappedSharedColumns = duplicateNames(mappedSharedColumns) + for i := range mappedSharedColumns { + mappedSharedColumns[i] = fmt.Sprintf(`IFNULL(%s, 'NULL')`, mappedSharedColumns[i]) + } + mappedSharedColumnsListing = strings.Join(mappedSharedColumns, ", ") + + // escape unique key columns for comparison queries + uniqueKeyColumnNames := duplicateNames(uniqueKey.Columns.Names()) + for i := range uniqueKeyColumnNames { + uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i]) + } + uniqueKeyColumnsListing := strings.Join(uniqueKeyColumnNames, ", ") + + originalChecksumQuery = fmt.Sprintf(` + select /* gh-ost checksum %s.%s */ + sha2( + group_concat( + sha2(concat_ws(',', %s), 256) order by %s + ), 256 + ) + from %s.%s force index (%s) + where (%s and %s) + `, databaseName, originalTableName, + sharedColumnsListing, uniqueKeyColumnsListing, + databaseName, originalTableName, uniqueKeyName, + rangeStartComparison, rangeEndComparison) + + ghostUniqueKeyColumnNames := duplicateNames(ghostUniqueKey.Columns.Names()) + for i := range ghostUniqueKeyColumnNames { + ghostUniqueKeyColumnNames[i] = EscapeName(ghostUniqueKeyColumnNames[i]) + } + ghostUniqueKeyColumnsListing := strings.Join(ghostUniqueKeyColumnNames, ", ") + + ghostChecksumQuery = fmt.Sprintf(` + select /* gh-ost checksum %s.%s */ + sha2( + group_concat( + sha2(concat_ws(',', %s), 256) order by %s + ), 256 + ) + from %s.%s force index (%s) + where (%s and %s) + `, databaseName, ghostTableName, + mappedSharedColumnsListing, ghostUniqueKeyColumnsListing, + databaseName, ghostTableName, ghostUniqueKeyName, + rangeStartComparison, rangeEndComparison) + return insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, nil } -func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { - rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns) - rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns) - return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) +func BuildRangeInsertPreparedQuery( + databaseName, originalTableName, ghostTableName string, + sharedColumns []string, mappedSharedColumns []string, + uniqueKey *UniqueKey, + ghostUniqueKey *UniqueKey, + rangeStartArgs, rangeEndArgs []interface{}, + includeRangeStartValues bool, transactionalTable bool, +) ( + insertQuery, originalChecksumQuery, ghostChecksumQuery string, explodedArgs []interface{}, err error, +) { + rangeStartValues := buildColumnsPreparedValues(&uniqueKey.Columns) + rangeEndValues := buildColumnsPreparedValues(&uniqueKey.Columns) + return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) } func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index a178c4ccb..60c94ddb6 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -165,14 +165,21 @@ func TestBuildRangeInsertQuery(t *testing.T) { ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} { - uniqueKey := "PRIMARY" - uniqueKeyColumns := NewColumnList([]string{"id"}) + uniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } rangeStartValues := []string{"@v1s"} rangeEndValues := []string{"@v1e"} rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -180,18 +187,49 @@ func TestBuildRangeInsertQuery(t *testing.T) { where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by id + ), 256 + ) + from mydb.tbl force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by id + ), 256 + ) + from mydb.ghost force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) } { - uniqueKey := "name_position_uidx" - uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + uniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } rangeStartValues := []string{"@v1s", "@v2s"} rangeEndValues := []string{"@v1e", "@v2e"} rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) @@ -199,7 +237,32 @@ func TestBuildRangeInsertQuery(t *testing.T) { where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by name, position + ), 256 + ) + from mydb.tbl force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by name, position + ), 256 + ) + from mydb.ghost force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117})) } } @@ -211,14 +274,20 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { sharedColumns := []string{"id", "name", "position"} mappedSharedColumns := []string{"id", "name", "location"} { - uniqueKey := "PRIMARY" - uniqueKeyColumns := NewColumnList([]string{"id"}) + uniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "PRIMARY", + Columns: *NewColumnList([]string{"id"}), + } rangeStartValues := []string{"@v1s"} rangeEndValues := []string{"@v1e"} rangeStartArgs := []interface{}{3} rangeEndArgs := []interface{}{103} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) @@ -226,18 +295,50 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by id + ), 256 + ) + from mydb.tbl force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(location, 'NULL')), 256) order by id + ), 256 + ) + from mydb.ghost force index (PRIMARY) + where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103})) } { - uniqueKey := "name_position_uidx" - uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + uniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "location"}), + } rangeStartValues := []string{"@v1s", "@v2s"} rangeEndValues := []string{"@v1e", "@v2e"} rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) + insertQuery, originalChecksumQuery, ghostChecksumQuery, explodedArgs, err := BuildRangeInsertQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, ghostUniqueKey, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location) @@ -245,7 +346,32 @@ func TestBuildRangeInsertQueryRenameMap(t *testing.T) { where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) ) ` - test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectEquals(normalizeQuery(insertQuery), normalizeQuery(expected)) + + expectedOriginalChecksumQuery := ` + select /* gh-ost checksum mydb.tbl */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(position, 'NULL')), 256) order by name, position + ), 256 + ) + from mydb.tbl force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(originalChecksumQuery), normalizeQuery(expectedOriginalChecksumQuery)) + + expectedGhostChecksumQuery := ` + select /* gh-ost checksum mydb.ghost */ + sha2( + group_concat( + sha2(concat_ws(',', IFNULL(id, 'NULL'), IFNULL(name, 'NULL'), IFNULL(location, 'NULL')), 256) order by name, location + ), 256 + ) + from mydb.ghost force index (name_position_uidx) + where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e)))) + ` + test.S(t).ExpectEquals(normalizeQuery(ghostChecksumQuery), normalizeQuery(expectedGhostChecksumQuery)) + test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117})) } } @@ -256,12 +382,19 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) { ghostTableName := "ghost" sharedColumns := []string{"id", "name", "position"} { - uniqueKey := "name_position_uidx" - uniqueKeyColumns := NewColumnList([]string{"name", "position"}) + uniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } + ghostUniqueKey := &UniqueKey{ + Name: "name_position_uidx", + Columns: *NewColumnList([]string{"name", "position"}), + } rangeStartArgs := []interface{}{3, 17} rangeEndArgs := []interface{}{103, 117} - query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) + query, _, _, explodedArgs, err := BuildRangeInsertPreparedQuery( + databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, ghostUniqueKey, rangeStartArgs, rangeEndArgs, true, true) test.S(t).ExpectNil(err) expected := ` insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) diff --git a/localtests/test.sh b/localtests/test.sh index d4b3f1723..938dda53b 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -148,6 +148,7 @@ test_single() { --alter='engine=innodb' \ --exact-rowcount \ --assume-rbr \ + --checksum-data \ --initially-drop-old-table \ --initially-drop-ghost-table \ --throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc' \ @@ -229,11 +230,12 @@ build_binary() { echo "Using binary: $ghost_binary" return 0 fi - go build -o $ghost_binary go/cmd/gh-ost/main.go + ./script/build if [ $? -ne 0 ] ; then echo "Build failure" exit 1 fi + cp bin/gh-ost $ghost_binary } test_all() {