Skip to content

used uber atomic bool instead standard in lock/unlock db #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions database/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cassandra
import (
"errors"
"fmt"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand Down Expand Up @@ -45,7 +46,7 @@ type Config struct {

type Cassandra struct {
session *gocql.Session
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -182,15 +183,16 @@ func (c *Cassandra) Close() error {
}

func (c *Cassandra) Lock() error {
if c.isLocked {
if !c.isLocked.CAS(false, true) {
return database.ErrLocked
}
c.isLocked = true
return nil
}

func (c *Cassandra) Unlock() error {
c.isLocked = false
if !c.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion database/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (ch *ClickHouse) Lock() error {
}
func (ch *ClickHouse) Unlock() error {
if !ch.isLocked.CAS(true, false) {
return database.ErrLocked
return database.ErrNotLocked
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is my bug))

}

return nil
Expand Down
22 changes: 18 additions & 4 deletions database/cockroachdb/cockroachdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand Down Expand Up @@ -46,7 +47,7 @@ type Config struct {

type CockroachDb struct {
db *sql.DB
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -152,15 +153,21 @@ func (c *CockroachDb) Close() error {
// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed
// See: https://github.com/cockroachdb/cockroach/issues/13546
func (c *CockroachDb) Lock() error {
if !c.isLocked.CAS(false, true) {
return database.ErrLocked
}

err := crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
c.isLocked.Store(false)
return err
}

query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1"
rows, err := tx.Query(query, aid)
if err != nil {
c.isLocked.Store(false)
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
}
defer func() {
Expand All @@ -172,30 +179,37 @@ func (c *CockroachDb) Lock() error {
// If row exists at all, lock is present
locked := rows.Next()
if locked && !c.config.ForceLock {
c.isLocked.Store(false)
return database.ErrLocked
}

query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)"
if _, err := tx.Exec(query, aid); err != nil {
c.isLocked.Store(false)
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
}

return nil
})

if err != nil {
c.isLocked.Store(false)
return err
} else {
c.isLocked = true
return nil
}
}

// Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed
// See: https://github.com/cockroachdb/cockroach/issues/13546
func (c *CockroachDb) Unlock() error {
if !c.isLocked.CAS(true, false) {
return database.ErrNotLocked
}

aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
c.isLocked.Store(true)
return err
}

Expand All @@ -208,14 +222,14 @@ func (c *CockroachDb) Unlock() error {
// https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go
if e.Code == "42P01" {
// On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema
c.isLocked = false
return nil
}
}

c.isLocked.Store(true)
return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)}
}

c.isLocked = false
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions database/firebird/firebird.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
_ "github.com/nakagami/firebirdsql"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand All @@ -36,7 +37,7 @@ type Firebird struct {
// Locking and unlocking need to use the same connection
conn *sql.Conn
db *sql.DB
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -106,15 +107,16 @@ func (f *Firebird) Close() error {
}

func (f *Firebird) Lock() error {
if f.isLocked {
if !f.isLocked.CAS(false, true) {
return database.ErrLocked
}
f.isLocked = true
return nil
}

func (f *Firebird) Unlock() error {
f.isLocked = false
if !f.isLocked.CAS(true, false) {
return database.ErrNotLocked
}
return nil
}

Expand Down
24 changes: 13 additions & 11 deletions database/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/x509"
"database/sql"
"fmt"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand Down Expand Up @@ -49,7 +50,7 @@ type Mysql struct {
// just do everything over a single conn anyway.
conn *sql.Conn
db *sql.DB
isLocked bool
isLocked atomic.Bool

config *Config
}
Expand Down Expand Up @@ -251,61 +252,62 @@ func (m *Mysql) Close() error {
}

func (m *Mysql) Lock() error {
if m.isLocked {
if !m.isLocked.CAS(false, true) {
return database.ErrLocked
}

if m.config.NoLock {
m.isLocked = true
return nil
}

aid, err := database.GenerateAdvisoryLockId(
fmt.Sprintf("%s:%s", m.config.DatabaseName, m.config.MigrationsTable))
if err != nil {
m.isLocked.Store(false)
return err
}

query := "SELECT GET_LOCK(?, 10)"
var success bool
if err := m.conn.QueryRowContext(context.Background(), query, aid).Scan(&success); err != nil {
m.isLocked.Store(false)
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
}

if success {
m.isLocked = true
return nil
if !success {
m.isLocked.Store(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here about how removing the local lock in this situation is desired since the local lock is an optimization and it's not trying to reflect the state of the db lock.

Actually, let's add our own CAS wrapper to automatically restore the lock state on error (in database/util.go) so we don't need to remember to do it:

func casRestoreOnErr(lock *atomic.Bool, o, n bool, f func() error) error {
    if !lock.CAS(o, n) {
        return ErrLocked
    }
    if err := f(); err != nil {
        // Automatically unlock
        lock.Store(o)
        return err
    }
    return nil
}

Also, can you add tests for this wrapper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Of course I will do it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, review pls

return database.ErrLocked
}

return database.ErrLocked
return nil
}

func (m *Mysql) Unlock() error {
if !m.isLocked {
return nil
if !m.isLocked.CAS(true, false) {
return database.ErrNotLocked
}

if m.config.NoLock {
m.isLocked = false
return nil
}

aid, err := database.GenerateAdvisoryLockId(
fmt.Sprintf("%s:%s", m.config.DatabaseName, m.config.MigrationsTable))
if err != nil {
m.isLocked.Store(true)
return err
}

query := `SELECT RELEASE_LOCK(?)`
if _, err := m.conn.ExecContext(context.Background(), query, aid); err != nil {
m.isLocked.Store(true)
return &database.Error{OrigErr: err, Query: []byte(query)}
}

// NOTE: RELEASE_LOCK could return NULL or (or 0 if the code is changed),
// in which case isLocked should be true until the timeout expires -- synchronizing
// these states is likely not worth trying to do; reconsider the necessity of isLocked.

m.isLocked = false
return nil
}

Expand Down
16 changes: 9 additions & 7 deletions database/pgx/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"database/sql"
"fmt"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand Down Expand Up @@ -58,7 +59,7 @@ type Postgres struct {
// Locking and unlocking need to use the same connection
conn *sql.Conn
db *sql.DB
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -220,40 +221,41 @@ func (p *Postgres) Close() error {

// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
func (p *Postgres) Lock() error {
if p.isLocked {
if !p.isLocked.CAS(false, true) {
return database.ErrLocked
}

aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
p.isLocked.Store(false)
return err
}

// This will wait indefinitely until the lock can be acquired.
query := `SELECT pg_advisory_lock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
p.isLocked.Store(false)
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
}

p.isLocked = true
return nil
}

func (p *Postgres) Unlock() error {
if !p.isLocked {
return nil
if !p.isLocked.CAS(true, false) {
return database.ErrNotLocked
}

aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
p.isLocked.Store(true)
return err
}

query := `SELECT pg_advisory_unlock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
p.isLocked.Store(true)
return &database.Error{OrigErr: err, Query: []byte(query)}
}
p.isLocked = false
return nil
}

Expand Down
15 changes: 9 additions & 6 deletions database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"database/sql"
"fmt"
"go.uber.org/atomic"
"io"
"io/ioutil"
nurl "net/url"
Expand Down Expand Up @@ -57,7 +58,7 @@ type Postgres struct {
// Locking and unlocking need to use the same connection
conn *sql.Conn
db *sql.DB
isLocked bool
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
config *Config
Expand Down Expand Up @@ -214,40 +215,42 @@ func (p *Postgres) Close() error {

// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
func (p *Postgres) Lock() error {
if p.isLocked {
if !p.isLocked.CAS(false, true) {
return database.ErrLocked
}

aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
p.isLocked.Store(false)
return err
}

// This will wait indefinitely until the lock can be acquired.
query := `SELECT pg_advisory_lock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
p.isLocked.Store(false)
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
}

p.isLocked = true
return nil
}

func (p *Postgres) Unlock() error {
if !p.isLocked {
return nil
if !p.isLocked.CAS(true, false) {
return database.ErrNotLocked
}

aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
p.isLocked.Store(true)
return err
}

query := `SELECT pg_advisory_unlock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
p.isLocked.Store(true)
return &database.Error{OrigErr: err, Query: []byte(query)}
}
p.isLocked = false
return nil
}

Expand Down
Loading