Skip to content

Commit

Permalink
Merge pull request #526 from github/teardown-contrib
Browse files Browse the repository at this point in the history
Implement teardown for gh-ost library
  • Loading branch information
Shlomi Noach authored Jan 14, 2018
2 parents 7133a11 + 6578877 commit f0c3028
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 88 deletions.
20 changes: 6 additions & 14 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"sync/atomic"
"time"

"github.com/satori/go.uuid"

"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"

Expand Down Expand Up @@ -71,6 +73,8 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea
// MigrationContext has the general, global state of migration. It is used by
// all components throughout the migration process.
type MigrationContext struct {
Uuid string

DatabaseName string
OriginalTableName string
AlterStatement string
Expand Down Expand Up @@ -195,8 +199,6 @@ type MigrationContext struct {
ForceTmpTableName string

recentBinlogCoordinates mysql.BinlogCoordinates

CanStopStreaming func() bool
}

type ContextConfig struct {
Expand All @@ -212,14 +214,9 @@ type ContextConfig struct {
}
}

var context *MigrationContext

func init() {
context = newMigrationContext()
}

func newMigrationContext() *MigrationContext {
func NewMigrationContext() *MigrationContext {
return &MigrationContext{
Uuid: uuid.NewV4().String(),
defaultNumRetries: 60,
ChunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
Expand All @@ -239,11 +236,6 @@ func newMigrationContext() *MigrationContext {
}
}

// GetMigrationContext
func GetMigrationContext() *MigrationContext {
return context
}

func getSafeTableName(baseName string, suffix string) string {
name := fmt.Sprintf("_%s_%s", baseName, suffix)
if len(name) <= mysql.MaxTableNameLength {
Expand Down
10 changes: 5 additions & 5 deletions go/base/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,27 @@ func init() {

func TestGetTableNames(t *testing.T) {
{
context = newMigrationContext()
context := NewMigrationContext()
context.OriginalTableName = "some_table"
test.S(t).ExpectEquals(context.GetOldTableName(), "_some_table_del")
test.S(t).ExpectEquals(context.GetGhostTableName(), "_some_table_gho")
test.S(t).ExpectEquals(context.GetChangelogTableName(), "_some_table_ghc")
}
{
context = newMigrationContext()
context := NewMigrationContext()
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890"
test.S(t).ExpectEquals(context.GetOldTableName(), "_a1234567890123456789012345678901234567890123456789012345678_del")
test.S(t).ExpectEquals(context.GetGhostTableName(), "_a1234567890123456789012345678901234567890123456789012345678_gho")
test.S(t).ExpectEquals(context.GetChangelogTableName(), "_a1234567890123456789012345678901234567890123456789012345678_ghc")
}
{
context = newMigrationContext()
context := NewMigrationContext()
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123"
oldTableName := context.GetOldTableName()
test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123456789012345678_del")
}
{
context = newMigrationContext()
context := NewMigrationContext()
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123"
context.TimestampOldTable = true
longForm := "Jan 2, 2006 at 3:04pm (MST)"
Expand All @@ -48,7 +48,7 @@ func TestGetTableNames(t *testing.T) {
test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123_20130203195400_del")
}
{
context = newMigrationContext()
context := NewMigrationContext()
context.OriginalTableName = "foo_bar_baz"
context.ForceTmpTableName = "tmp"
test.S(t).ExpectEquals(context.GetOldTableName(), "_tmp_del")
Expand Down
22 changes: 8 additions & 14 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,26 @@ type GoMySQLReader struct {
currentCoordinates mysql.BinlogCoordinates
currentCoordinatesMutex *sync.Mutex
LastAppliedRowsEventHint mysql.BinlogCoordinates
MigrationContext *base.MigrationContext
}

func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *GoMySQLReader, err error) {
binlogReader = &GoMySQLReader{
connectionConfig: connectionConfig,
connectionConfig: migrationContext.InspectorConnectionConfig,
currentCoordinates: mysql.BinlogCoordinates{},
currentCoordinatesMutex: &sync.Mutex{},
binlogSyncer: nil,
binlogStreamer: nil,
MigrationContext: base.GetMigrationContext(),
}

serverId := uint32(binlogReader.MigrationContext.ReplicaServerId)
serverId := uint32(migrationContext.ReplicaServerId)

binlogSyncerConfig := &replication.BinlogSyncerConfig{
ServerID: serverId,
Flavor: "mysql",
Host: connectionConfig.Key.Hostname,
Port: uint16(connectionConfig.Key.Port),
User: connectionConfig.User,
Password: connectionConfig.Password,
Host: binlogReader.connectionConfig.Key.Hostname,
Port: uint16(binlogReader.connectionConfig.Key.Port),
User: binlogReader.connectionConfig.User,
Password: binlogReader.connectionConfig.Password,
}
binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig)

Expand Down Expand Up @@ -160,10 +158,6 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
}

func (this *GoMySQLReader) Close() error {
// Historically there was a:
// this.binlogSyncer.Close()
// here. A new go-mysql version closes the binlog syncer connection independently.
// I will go against the sacred rules of comments and just leave this here.
// This is the year 2017. Let's see what year these comments get deleted.
this.binlogSyncer.Close()
return nil
}
4 changes: 2 additions & 2 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func acceptSignals(migrationContext *base.MigrationContext) {

// main is the application's entry point. It will either spawn a CLI or HTTP interfaces.
func main() {
migrationContext := base.GetMigrationContext()
migrationContext := base.NewMigrationContext()

flag.StringVar(&migrationContext.InspectorConnectionConfig.Key.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)")
flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unable to determine the master")
Expand Down Expand Up @@ -242,7 +242,7 @@ func main() {
log.Infof("starting gh-ost %+v", AppVersion)
acceptSignals(migrationContext)

migrator := logic.NewMigrator()
migrator := logic.NewMigrator(migrationContext)
err := migrator.Migrate()
if err != nil {
migrator.ExecOnFailureHook()
Expand Down
31 changes: 22 additions & 9 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,29 @@ func newDmlBuildResultError(err error) *dmlBuildResult {
// Applier is the one to actually write row data and apply binlog events onto the ghost table.
// It is where the ghost & changelog tables get created. It is where the cut-over phase happens.
type Applier struct {
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
singletonDB *gosql.DB
migrationContext *base.MigrationContext
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
singletonDB *gosql.DB
migrationContext *base.MigrationContext
finishedMigrating int64
}

func NewApplier() *Applier {
func NewApplier(migrationContext *base.MigrationContext) *Applier {
return &Applier{
connectionConfig: base.GetMigrationContext().ApplierConnectionConfig,
migrationContext: base.GetMigrationContext(),
connectionConfig: migrationContext.ApplierConnectionConfig,
migrationContext: migrationContext,
finishedMigrating: 0,
}
}

func (this *Applier) InitDBConnections() (err error) {

applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = sqlutils.GetDB(applierUri); err != nil {
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
return err
}
singletonApplierUri := fmt.Sprintf("%s?timeout=0", applierUri)
if this.singletonDB, _, err = sqlutils.GetDB(singletonApplierUri); err != nil {
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
}
this.singletonDB.SetMaxOpenConns(1)
Expand Down Expand Up @@ -320,6 +323,9 @@ func (this *Applier) InitiateHeartbeat() {

heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range heartbeatTick {
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return
}
// Generally speaking, we would issue a goroutine, but I'd actually rather
// have this block the loop rather than spam the master in the event something
// goes wrong
Expand Down Expand Up @@ -1074,3 +1080,10 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents))
return nil
}

func (this *Applier) Teardown() {
log.Debugf("Tearing down...")
this.db.Close()
this.singletonDB.Close()
atomic.StoreInt64(&this.finishedMigrating, 1)
}
4 changes: 2 additions & 2 deletions go/logic/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type HooksExecutor struct {
migrationContext *base.MigrationContext
}

func NewHooksExecutor() *HooksExecutor {
func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor {
return &HooksExecutor{
migrationContext: base.GetMigrationContext(),
migrationContext: migrationContext,
}
}

Expand Down
28 changes: 21 additions & 7 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,30 @@ const startSlavePostWaitMilliseconds = 500 * time.Millisecond
// Inspector reads data from the read-MySQL-server (typically a replica, but can be the master)
// It is used for gaining initial status and structure, and later also follow up on progress and changelog
type Inspector struct {
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
migrationContext *base.MigrationContext
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
informationSchemaDb *gosql.DB
migrationContext *base.MigrationContext
}

func NewInspector() *Inspector {
func NewInspector(migrationContext *base.MigrationContext) *Inspector {
return &Inspector{
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
migrationContext: base.GetMigrationContext(),
connectionConfig: migrationContext.InspectorConnectionConfig,
migrationContext: migrationContext,
}
}

func (this *Inspector) InitDBConnections() (err error) {
inspectorUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = sqlutils.GetDB(inspectorUri); err != nil {
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, inspectorUri); err != nil {
return err
}

informationSchemaUri := this.connectionConfig.GetDBUri("information_schema")
if this.informationSchemaDb, _, err = mysql.GetDB(this.migrationContext.Uuid, informationSchemaUri); err != nil {
return err
}

if err := this.validateConnection(); err != nil {
return err
}
Expand Down Expand Up @@ -749,7 +756,14 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect

func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
replicationLag, err = mysql.GetReplicationLag(
this.informationSchemaDb,
this.migrationContext.InspectorConnectionConfig,
)
return replicationLag, err
}

func (this *Inspector) Teardown() {
this.db.Close()
this.informationSchemaDb.Close()
return
}
Loading

0 comments on commit f0c3028

Please sign in to comment.