Skip to content

Commit

Permalink
fixing local lock optimization implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
prinkov committed Jun 14, 2021
1 parent 9814da9 commit 55902af
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 28 deletions.
17 changes: 7 additions & 10 deletions database/pgx/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,44 +221,41 @@ func (p *Postgres) Close() error {

// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
func (p *Postgres) Lock() error {
if p.isLocked.Load() {
if !p.isLocked.CAS(false, true) {
return database.ErrLocked
}

aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
p.isLocked.Store(false)
return err
}

// This will wait indefinitely until the lock can be acquired.
query := `SELECT pg_advisory_lock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
p.isLocked.Store(false)
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
}

if !p.isLocked.CAS(false, true) {
return database.ErrLocked
}
return nil
}

func (p *Postgres) Unlock() error {
if !p.isLocked.Load() {
return nil
if !p.isLocked.CAS(true, false) {
return database.ErrNotLocked
}

aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
p.isLocked.Store(true)
return err
}

query := `SELECT pg_advisory_unlock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
p.isLocked.Store(true)
return &database.Error{OrigErr: err, Query: []byte(query)}
}
if !p.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

Expand Down
16 changes: 7 additions & 9 deletions database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,44 +215,42 @@ func (p *Postgres) Close() error {

// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
func (p *Postgres) Lock() error {
if p.isLocked.Load() {
if !p.isLocked.CAS(false, true) {
return database.ErrLocked
}

aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
p.isLocked.Store(false)
return err
}

// This will wait indefinitely until the lock can be acquired.
query := `SELECT pg_advisory_lock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
p.isLocked.Store(false)
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
}

if !p.isLocked.CAS(false, true) {
return database.ErrLocked
}
return nil
}

func (p *Postgres) Unlock() error {
if !p.isLocked.Load() {
return nil
if !p.isLocked.CAS(true, false) {
return database.ErrNotLocked
}

aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
p.isLocked.Store(true)
return err
}

query := `SELECT pg_advisory_unlock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
p.isLocked.Store(true)
return &database.Error{OrigErr: err, Query: []byte(query)}
}
if !p.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

Expand Down
17 changes: 8 additions & 9 deletions database/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (ss *SQLServer) Close() error {

// Lock creates an advisory local on the database to prevent multiple migrations from running at the same time.
func (ss *SQLServer) Lock() error {
if ss.isLocked.Load() {
if !ss.isLocked.CAS(false, true) {
return database.ErrLocked
}

Expand All @@ -171,36 +171,35 @@ func (ss *SQLServer) Lock() error {

var status mssql.ReturnStatus
if _, err = ss.conn.ExecContext(context.Background(), query, aid, &status); err == nil && status > -1 {
if !ss.isLocked.CAS(false, true) {
return database.ErrLocked
}
return nil
} else if err != nil {
ss.isLocked.Store(false)
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
} else {
ss.isLocked.Store(false)
return &database.Error{Err: fmt.Sprintf("try lock failed with error %v: %v", status, lockErrorMap[status]), Query: []byte(query)}
}
}

// Unlock froms the migration lock from the database
func (ss *SQLServer) Unlock() error {
if !ss.isLocked.Load() {
return nil
if !ss.isLocked.CAS(true, false) {
return database.ErrNotLocked
}

aid, err := database.GenerateAdvisoryLockId(ss.config.DatabaseName, ss.config.SchemaName)
if err != nil {
ss.isLocked.Store(true)
return err
}

// MS Docs: sp_releaseapplock: https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-releaseapplock-transact-sql?view=sql-server-2017
query := `EXEC sp_releaseapplock @Resource = @p1, @LockOwner = 'Session'`
if _, err := ss.conn.ExecContext(context.Background(), query, aid); err != nil {
ss.isLocked.Store(true)
return &database.Error{OrigErr: err, Query: []byte(query)}
}
if !ss.isLocked.CAS(true, false) {
return database.ErrNotLocked
}

return nil
}

Expand Down

0 comments on commit 55902af

Please sign in to comment.