From 9814da90d40c151a54937c3a4c2d24081b7ed190 Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Tue, 8 Jun 2021 18:15:10 +0300 Subject: [PATCH 01/11] used uber atomic bool instead standard in lock/unlock db --- database/cassandra/cassandra.go | 10 ++++++---- database/clickhouse/clickhouse.go | 2 +- database/cockroachdb/cockroachdb.go | 15 +++++++++++---- database/firebird/firebird.go | 10 ++++++---- database/mysql/mysql.go | 25 +++++++++++++++---------- database/pgx/pgx.go | 15 ++++++++++----- database/postgres/postgres.go | 15 ++++++++++----- database/ql/ql.go | 11 +++++------ database/redshift/redshift.go | 10 ++++++---- database/snowflake/snowflake.go | 10 ++++++---- database/sqlcipher/sqlcipher.go | 11 +++++------ database/sqlite3/sqlite3.go | 11 +++++------ database/sqlserver/sqlserver.go | 16 ++++++++++------ database/stub/stub.go | 10 ++++++---- 14 files changed, 102 insertions(+), 69 deletions(-) diff --git a/database/cassandra/cassandra.go b/database/cassandra/cassandra.go index 15e276892..58713e72b 100644 --- a/database/cassandra/cassandra.go +++ b/database/cassandra/cassandra.go @@ -3,6 +3,7 @@ package cassandra import ( "errors" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -45,7 +46,7 @@ type Config struct { type Cassandra struct { session *gocql.Session - isLocked bool + isLocked atomic.Bool // Open and WithInstance need to guarantee that config is never nil config *Config @@ -182,15 +183,16 @@ func (c *Cassandra) Close() error { } func (c *Cassandra) Lock() error { - if c.isLocked { + if !c.isLocked.CAS(false, true) { return database.ErrLocked } - c.isLocked = true return nil } func (c *Cassandra) Unlock() error { - c.isLocked = false + if !c.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } diff --git a/database/clickhouse/clickhouse.go b/database/clickhouse/clickhouse.go index b612aec59..625658ea8 100644 --- a/database/clickhouse/clickhouse.go +++ b/database/clickhouse/clickhouse.go @@ -285,7 +285,7 @@ func (ch *ClickHouse) Lock() error { } func (ch *ClickHouse) Unlock() error { if !ch.isLocked.CAS(true, false) { - return database.ErrLocked + return database.ErrNotLocked } return nil diff --git a/database/cockroachdb/cockroachdb.go b/database/cockroachdb/cockroachdb.go index 24cc6471f..582bd96b4 100644 --- a/database/cockroachdb/cockroachdb.go +++ b/database/cockroachdb/cockroachdb.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -46,7 +47,7 @@ type Config struct { type CockroachDb struct { db *sql.DB - isLocked bool + isLocked atomic.Bool // Open and WithInstance need to guarantee that config is never nil config *Config @@ -186,7 +187,9 @@ func (c *CockroachDb) Lock() error { if err != nil { return err } else { - c.isLocked = true + if !c.isLocked.CAS(false, true) { + return database.ErrLocked + } return nil } } @@ -208,14 +211,18 @@ func (c *CockroachDb) Unlock() error { // https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go if e.Code == "42P01" { // On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema - c.isLocked = false + if !c.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } } return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)} } - c.isLocked = false + if !c.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } diff --git a/database/firebird/firebird.go b/database/firebird/firebird.go index ca393e22b..41ccc33d3 100644 --- a/database/firebird/firebird.go +++ b/database/firebird/firebird.go @@ -10,6 +10,7 @@ import ( "github.com/golang-migrate/migrate/v4/database" "github.com/hashicorp/go-multierror" _ "github.com/nakagami/firebirdsql" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -36,7 +37,7 @@ type Firebird struct { // Locking and unlocking need to use the same connection conn *sql.Conn db *sql.DB - isLocked bool + isLocked atomic.Bool // Open and WithInstance need to guarantee that config is never nil config *Config @@ -106,15 +107,16 @@ func (f *Firebird) Close() error { } func (f *Firebird) Lock() error { - if f.isLocked { + if !f.isLocked.CAS(false, true) { return database.ErrLocked } - f.isLocked = true return nil } func (f *Firebird) Unlock() error { - f.isLocked = false + if !f.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } diff --git a/database/mysql/mysql.go b/database/mysql/mysql.go index 586df2494..98783ec99 100644 --- a/database/mysql/mysql.go +++ b/database/mysql/mysql.go @@ -8,6 +8,7 @@ import ( "crypto/x509" "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -49,7 +50,7 @@ type Mysql struct { // just do everything over a single conn anyway. conn *sql.Conn db *sql.DB - isLocked bool + isLocked atomic.Bool config *Config } @@ -251,12 +252,10 @@ func (m *Mysql) Close() error { } func (m *Mysql) Lock() error { - if m.isLocked { - return database.ErrLocked - } - if m.config.NoLock { - m.isLocked = true + if !m.isLocked.CAS(false, true) { + return database.ErrLocked + } return nil } @@ -273,7 +272,9 @@ func (m *Mysql) Lock() error { } if success { - m.isLocked = true + if !m.isLocked.CAS(false, true) { + return database.ErrLocked + } return nil } @@ -281,12 +282,14 @@ func (m *Mysql) Lock() error { } func (m *Mysql) Unlock() error { - if !m.isLocked { + if !m.isLocked.Load() { return nil } if m.config.NoLock { - m.isLocked = false + if !m.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } @@ -305,7 +308,9 @@ func (m *Mysql) Unlock() error { // in which case isLocked should be true until the timeout expires -- synchronizing // these states is likely not worth trying to do; reconsider the necessity of isLocked. - m.isLocked = false + if !m.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } diff --git a/database/pgx/pgx.go b/database/pgx/pgx.go index 423ac1a08..666a89ef3 100644 --- a/database/pgx/pgx.go +++ b/database/pgx/pgx.go @@ -6,6 +6,7 @@ import ( "context" "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -58,7 +59,7 @@ type Postgres struct { // Locking and unlocking need to use the same connection conn *sql.Conn db *sql.DB - isLocked bool + isLocked atomic.Bool // Open and WithInstance need to guarantee that config is never nil config *Config @@ -220,7 +221,7 @@ func (p *Postgres) Close() error { // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS func (p *Postgres) Lock() error { - if p.isLocked { + if p.isLocked.Load() { return database.ErrLocked } @@ -235,12 +236,14 @@ func (p *Postgres) Lock() error { return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} } - p.isLocked = true + if !p.isLocked.CAS(false, true) { + return database.ErrLocked + } return nil } func (p *Postgres) Unlock() error { - if !p.isLocked { + if !p.isLocked.Load() { return nil } @@ -253,7 +256,9 @@ func (p *Postgres) Unlock() error { if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } - p.isLocked = false + if !p.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } diff --git a/database/postgres/postgres.go b/database/postgres/postgres.go index 0e384fe36..9c7077f2f 100644 --- a/database/postgres/postgres.go +++ b/database/postgres/postgres.go @@ -6,6 +6,7 @@ import ( "context" "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -57,7 +58,7 @@ type Postgres struct { // Locking and unlocking need to use the same connection conn *sql.Conn db *sql.DB - isLocked bool + isLocked atomic.Bool // Open and WithInstance need to guarantee that config is never nil config *Config @@ -214,7 +215,7 @@ func (p *Postgres) Close() error { // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS func (p *Postgres) Lock() error { - if p.isLocked { + if p.isLocked.Load() { return database.ErrLocked } @@ -229,12 +230,14 @@ func (p *Postgres) Lock() error { return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} } - p.isLocked = true + if !p.isLocked.CAS(false, true) { + return database.ErrLocked + } return nil } func (p *Postgres) Unlock() error { - if !p.isLocked { + if !p.isLocked.Load() { return nil } @@ -247,7 +250,9 @@ func (p *Postgres) Unlock() error { if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } - p.isLocked = false + if !p.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } diff --git a/database/ql/ql.go b/database/ql/ql.go index 5b2dbe355..1c4c49be6 100644 --- a/database/ql/ql.go +++ b/database/ql/ql.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "github.com/hashicorp/go-multierror" + "go.uber.org/atomic" "io" "io/ioutil" "strings" @@ -34,7 +35,7 @@ type Config struct { type Ql struct { db *sql.DB - isLocked bool + isLocked atomic.Bool config *Config } @@ -166,17 +167,15 @@ func (m *Ql) Drop() (err error) { return nil } func (m *Ql) Lock() error { - if m.isLocked { + if !m.isLocked.CAS(false, true) { return database.ErrLocked } - m.isLocked = true return nil } func (m *Ql) Unlock() error { - if !m.isLocked { - return nil + if !m.isLocked.CAS(true, false) { + return database.ErrNotLocked } - m.isLocked = false return nil } func (m *Ql) Run(migration io.Reader) error { diff --git a/database/redshift/redshift.go b/database/redshift/redshift.go index 1f10a29a4..d4539b8a2 100644 --- a/database/redshift/redshift.go +++ b/database/redshift/redshift.go @@ -6,6 +6,7 @@ import ( "context" "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -36,7 +37,7 @@ type Config struct { } type Redshift struct { - isLocked bool + isLocked atomic.Bool conn *sql.Conn db *sql.DB @@ -126,15 +127,16 @@ func (p *Redshift) Close() error { // Redshift does not support advisory lock functions: https://docs.aws.amazon.com/redshift/latest/dg/c_unsupported-postgresql-functions.html func (p *Redshift) Lock() error { - if p.isLocked { + if !p.isLocked.CAS(false, true) { return database.ErrLocked } - p.isLocked = true return nil } func (p *Redshift) Unlock() error { - p.isLocked = false + if !p.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } diff --git a/database/snowflake/snowflake.go b/database/snowflake/snowflake.go index 2ad794cec..53d7ca282 100644 --- a/database/snowflake/snowflake.go +++ b/database/snowflake/snowflake.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -37,7 +38,7 @@ type Config struct { } type Snowflake struct { - isLocked bool + isLocked atomic.Bool conn *sql.Conn db *sql.DB @@ -158,15 +159,16 @@ func (p *Snowflake) Close() error { } func (p *Snowflake) Lock() error { - if p.isLocked { + if !p.isLocked.CAS(false, true) { return database.ErrLocked } - p.isLocked = true return nil } func (p *Snowflake) Unlock() error { - p.isLocked = false + if !p.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } diff --git a/database/sqlcipher/sqlcipher.go b/database/sqlcipher/sqlcipher.go index 53e97446a..782eed24b 100644 --- a/database/sqlcipher/sqlcipher.go +++ b/database/sqlcipher/sqlcipher.go @@ -3,6 +3,7 @@ package sqlcipher import ( "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -34,7 +35,7 @@ type Config struct { type Sqlite struct { db *sql.DB - isLocked bool + isLocked atomic.Bool config *Config } @@ -177,18 +178,16 @@ func (m *Sqlite) Drop() (err error) { } func (m *Sqlite) Lock() error { - if m.isLocked { + if !m.isLocked.CAS(false, true) { return database.ErrLocked } - m.isLocked = true return nil } func (m *Sqlite) Unlock() error { - if !m.isLocked { - return nil + if !m.isLocked.CAS(true, false) { + return database.ErrNotLocked } - m.isLocked = false return nil } diff --git a/database/sqlite3/sqlite3.go b/database/sqlite3/sqlite3.go index 4d40f3ecf..65aa6e74c 100644 --- a/database/sqlite3/sqlite3.go +++ b/database/sqlite3/sqlite3.go @@ -3,6 +3,7 @@ package sqlite3 import ( "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -34,7 +35,7 @@ type Config struct { type Sqlite struct { db *sql.DB - isLocked bool + isLocked atomic.Bool config *Config } @@ -177,18 +178,16 @@ func (m *Sqlite) Drop() (err error) { } func (m *Sqlite) Lock() error { - if m.isLocked { + if !m.isLocked.CAS(false, true) { return database.ErrLocked } - m.isLocked = true return nil } func (m *Sqlite) Unlock() error { - if !m.isLocked { - return nil + if !m.isLocked.CAS(true, false) { + return database.ErrNotLocked } - m.isLocked = false return nil } diff --git a/database/sqlserver/sqlserver.go b/database/sqlserver/sqlserver.go index b90619ff9..84fd6c21a 100644 --- a/database/sqlserver/sqlserver.go +++ b/database/sqlserver/sqlserver.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -47,7 +48,7 @@ type SQLServer struct { // Locking and unlocking need to use the same connection conn *sql.Conn db *sql.DB - isLocked bool + isLocked atomic.Bool // Open and WithInstance need to garantuee that config is never nil config *Config @@ -154,7 +155,7 @@ func (ss *SQLServer) Close() error { // Lock creates an advisory local on the database to prevent multiple migrations from running at the same time. func (ss *SQLServer) Lock() error { - if ss.isLocked { + if ss.isLocked.Load() { return database.ErrLocked } @@ -170,7 +171,9 @@ func (ss *SQLServer) Lock() error { var status mssql.ReturnStatus if _, err = ss.conn.ExecContext(context.Background(), query, aid, &status); err == nil && status > -1 { - ss.isLocked = true + if !ss.isLocked.CAS(false, true) { + return database.ErrLocked + } return nil } else if err != nil { return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} @@ -181,7 +184,7 @@ func (ss *SQLServer) Lock() error { // Unlock froms the migration lock from the database func (ss *SQLServer) Unlock() error { - if !ss.isLocked { + if !ss.isLocked.Load() { return nil } @@ -195,8 +198,9 @@ func (ss *SQLServer) Unlock() error { if _, err := ss.conn.ExecContext(context.Background(), query, aid); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } - ss.isLocked = false - + if !ss.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } diff --git a/database/stub/stub.go b/database/stub/stub.go index f05e5443b..238ce8ba6 100644 --- a/database/stub/stub.go +++ b/database/stub/stub.go @@ -1,6 +1,7 @@ package stub import ( + "go.uber.org/atomic" "io" "io/ioutil" "reflect" @@ -19,7 +20,7 @@ type Stub struct { MigrationSequence []string LastRunMigration []byte // todo: make []string IsDirty bool - IsLocked bool + isLocked atomic.Bool Config *Config } @@ -49,15 +50,16 @@ func (s *Stub) Close() error { } func (s *Stub) Lock() error { - if s.IsLocked { + if !s.isLocked.CAS(false, true) { return database.ErrLocked } - s.IsLocked = true return nil } func (s *Stub) Unlock() error { - s.IsLocked = false + if !s.isLocked.CAS(true, false) { + return database.ErrNotLocked + } return nil } From 55902afb252d01c3ec264084042d71352f488f1a Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Mon, 14 Jun 2021 21:11:27 +0300 Subject: [PATCH 02/11] fixing local lock optimization implementation --- database/pgx/pgx.go | 17 +++++++---------- database/postgres/postgres.go | 16 +++++++--------- database/sqlserver/sqlserver.go | 17 ++++++++--------- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/database/pgx/pgx.go b/database/pgx/pgx.go index 666a89ef3..153f6a1ae 100644 --- a/database/pgx/pgx.go +++ b/database/pgx/pgx.go @@ -221,44 +221,41 @@ func (p *Postgres) Close() error { // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS func (p *Postgres) Lock() error { - if p.isLocked.Load() { + if !p.isLocked.CAS(false, true) { return database.ErrLocked } aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) if err != nil { + p.isLocked.Store(false) return err } // This will wait indefinitely until the lock can be acquired. query := `SELECT pg_advisory_lock($1)` if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + p.isLocked.Store(false) return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} } - - if !p.isLocked.CAS(false, true) { - return database.ErrLocked - } return nil } func (p *Postgres) Unlock() error { - if !p.isLocked.Load() { - return nil + if !p.isLocked.CAS(true, false) { + return database.ErrNotLocked } aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) if err != nil { + p.isLocked.Store(true) return err } query := `SELECT pg_advisory_unlock($1)` if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + p.isLocked.Store(true) return &database.Error{OrigErr: err, Query: []byte(query)} } - if !p.isLocked.CAS(true, false) { - return database.ErrNotLocked - } return nil } diff --git a/database/postgres/postgres.go b/database/postgres/postgres.go index 9c7077f2f..911f58ba4 100644 --- a/database/postgres/postgres.go +++ b/database/postgres/postgres.go @@ -215,44 +215,42 @@ func (p *Postgres) Close() error { // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS func (p *Postgres) Lock() error { - if p.isLocked.Load() { + if !p.isLocked.CAS(false, true) { return database.ErrLocked } aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) if err != nil { + p.isLocked.Store(false) return err } // This will wait indefinitely until the lock can be acquired. query := `SELECT pg_advisory_lock($1)` if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + p.isLocked.Store(false) return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} } - if !p.isLocked.CAS(false, true) { - return database.ErrLocked - } return nil } func (p *Postgres) Unlock() error { - if !p.isLocked.Load() { - return nil + if !p.isLocked.CAS(true, false) { + return database.ErrNotLocked } aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) if err != nil { + p.isLocked.Store(true) return err } query := `SELECT pg_advisory_unlock($1)` if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + p.isLocked.Store(true) return &database.Error{OrigErr: err, Query: []byte(query)} } - if !p.isLocked.CAS(true, false) { - return database.ErrNotLocked - } return nil } diff --git a/database/sqlserver/sqlserver.go b/database/sqlserver/sqlserver.go index 84fd6c21a..bb5f2f5dc 100644 --- a/database/sqlserver/sqlserver.go +++ b/database/sqlserver/sqlserver.go @@ -155,7 +155,7 @@ func (ss *SQLServer) Close() error { // Lock creates an advisory local on the database to prevent multiple migrations from running at the same time. func (ss *SQLServer) Lock() error { - if ss.isLocked.Load() { + if !ss.isLocked.CAS(false, true) { return database.ErrLocked } @@ -171,36 +171,35 @@ func (ss *SQLServer) Lock() error { var status mssql.ReturnStatus if _, err = ss.conn.ExecContext(context.Background(), query, aid, &status); err == nil && status > -1 { - if !ss.isLocked.CAS(false, true) { - return database.ErrLocked - } return nil } else if err != nil { + ss.isLocked.Store(false) return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} } else { + ss.isLocked.Store(false) return &database.Error{Err: fmt.Sprintf("try lock failed with error %v: %v", status, lockErrorMap[status]), Query: []byte(query)} } } // Unlock froms the migration lock from the database func (ss *SQLServer) Unlock() error { - if !ss.isLocked.Load() { - return nil + if !ss.isLocked.CAS(true, false) { + return database.ErrNotLocked } aid, err := database.GenerateAdvisoryLockId(ss.config.DatabaseName, ss.config.SchemaName) if err != nil { + ss.isLocked.Store(true) return err } // MS Docs: sp_releaseapplock: https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-releaseapplock-transact-sql?view=sql-server-2017 query := `EXEC sp_releaseapplock @Resource = @p1, @LockOwner = 'Session'` if _, err := ss.conn.ExecContext(context.Background(), query, aid); err != nil { + ss.isLocked.Store(true) return &database.Error{OrigErr: err, Query: []byte(query)} } - if !ss.isLocked.CAS(true, false) { - return database.ErrNotLocked - } + return nil } From 1598b1ef73753c16a6959c2e83c958c985e308b4 Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Tue, 15 Jun 2021 22:44:40 +0300 Subject: [PATCH 03/11] fixing mysql local lock optimization implementation --- database/mysql/mysql.go | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/database/mysql/mysql.go b/database/mysql/mysql.go index 98783ec99..56840d06d 100644 --- a/database/mysql/mysql.go +++ b/database/mysql/mysql.go @@ -252,55 +252,55 @@ func (m *Mysql) Close() error { } func (m *Mysql) Lock() error { + if !m.isLocked.CAS(false, true) { + return database.ErrLocked + } + if m.config.NoLock { - if !m.isLocked.CAS(false, true) { - return database.ErrLocked - } return nil } aid, err := database.GenerateAdvisoryLockId( fmt.Sprintf("%s:%s", m.config.DatabaseName, m.config.MigrationsTable)) if err != nil { + m.isLocked.Store(false) return err } query := "SELECT GET_LOCK(?, 10)" var success bool if err := m.conn.QueryRowContext(context.Background(), query, aid).Scan(&success); err != nil { + m.isLocked.Store(false) return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} } - if success { - if !m.isLocked.CAS(false, true) { - return database.ErrLocked - } - return nil + if !success { + m.isLocked.Store(false) + return database.ErrLocked } - return database.ErrLocked + return nil } func (m *Mysql) Unlock() error { - if !m.isLocked.Load() { - return nil + if !m.isLocked.CAS(true, false) { + return database.ErrNotLocked } if m.config.NoLock { - if !m.isLocked.CAS(true, false) { - return database.ErrNotLocked - } return nil } aid, err := database.GenerateAdvisoryLockId( fmt.Sprintf("%s:%s", m.config.DatabaseName, m.config.MigrationsTable)) if err != nil { + m.isLocked.Store(true) return err } query := `SELECT RELEASE_LOCK(?)` if _, err := m.conn.ExecContext(context.Background(), query, aid); err != nil { + m.isLocked.Store(true) return &database.Error{OrigErr: err, Query: []byte(query)} } @@ -308,9 +308,6 @@ func (m *Mysql) Unlock() error { // in which case isLocked should be true until the timeout expires -- synchronizing // these states is likely not worth trying to do; reconsider the necessity of isLocked. - if !m.isLocked.CAS(true, false) { - return database.ErrNotLocked - } return nil } From 93b3798fad173752031136e5f0300986db406405 Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Tue, 15 Jun 2021 22:51:49 +0300 Subject: [PATCH 04/11] fixing cockroachdb local lock optimization implementation --- database/cockroachdb/cockroachdb.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/database/cockroachdb/cockroachdb.go b/database/cockroachdb/cockroachdb.go index 582bd96b4..7b7778698 100644 --- a/database/cockroachdb/cockroachdb.go +++ b/database/cockroachdb/cockroachdb.go @@ -153,15 +153,21 @@ func (c *CockroachDb) Close() error { // Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed // See: https://github.com/cockroachdb/cockroach/issues/13546 func (c *CockroachDb) Lock() error { + if !c.isLocked.CAS(false, true) { + return database.ErrLocked + } + err := crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) { aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName) if err != nil { + c.isLocked.Store(false) return err } query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1" rows, err := tx.Query(query, aid) if err != nil { + c.isLocked.Store(false) return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)} } defer func() { @@ -173,11 +179,13 @@ func (c *CockroachDb) Lock() error { // If row exists at all, lock is present locked := rows.Next() if locked && !c.config.ForceLock { + c.isLocked.Store(false) return database.ErrLocked } query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)" if _, err := tx.Exec(query, aid); err != nil { + c.isLocked.Store(false) return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)} } @@ -185,11 +193,9 @@ func (c *CockroachDb) Lock() error { }) if err != nil { + c.isLocked.Store(false) return err } else { - if !c.isLocked.CAS(false, true) { - return database.ErrLocked - } return nil } } @@ -197,8 +203,13 @@ func (c *CockroachDb) Lock() error { // Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed // See: https://github.com/cockroachdb/cockroach/issues/13546 func (c *CockroachDb) Unlock() error { + if !c.isLocked.CAS(true, false) { + return database.ErrNotLocked + } + aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName) if err != nil { + c.isLocked.Store(true) return err } @@ -211,18 +222,14 @@ func (c *CockroachDb) Unlock() error { // https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go if e.Code == "42P01" { // On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema - if !c.isLocked.CAS(true, false) { - return database.ErrNotLocked - } return nil } } + + c.isLocked.Store(true) return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)} } - if !c.isLocked.CAS(true, false) { - return database.ErrNotLocked - } return nil } From c1da4a8ee3aec9ba4224f1588dd648e975bbf415 Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Wed, 16 Jun 2021 12:26:07 +0300 Subject: [PATCH 05/11] CAS wrapper to automatically restore --- database/cockroachdb/cockroachdb.go | 111 ++++++++++++---------------- database/mysql/mysql.go | 84 ++++++++++----------- database/pgx/pgx.go | 54 ++++++-------- database/postgres/postgres.go | 54 ++++++-------- database/sqlserver/sqlserver.go | 70 ++++++++---------- database/util.go | 14 ++++ 6 files changed, 175 insertions(+), 212 deletions(-) diff --git a/database/cockroachdb/cockroachdb.go b/database/cockroachdb/cockroachdb.go index 7b7778698..935131ab3 100644 --- a/database/cockroachdb/cockroachdb.go +++ b/database/cockroachdb/cockroachdb.go @@ -153,84 +153,67 @@ func (c *CockroachDb) Close() error { // Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed // See: https://github.com/cockroachdb/cockroach/issues/13546 func (c *CockroachDb) Lock() error { - if !c.isLocked.CAS(false, true) { - return database.ErrLocked - } - - err := crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) { - aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName) - if err != nil { - c.isLocked.Store(false) - return err - } - - query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1" - rows, err := tx.Query(query, aid) - if err != nil { - c.isLocked.Store(false) - return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)} - } - defer func() { - if errClose := rows.Close(); errClose != nil { - err = multierror.Append(err, errClose) + return database.CasRestoreOnErr(&c.isLocked, false, true, database.ErrLocked, func() (err error) { + return crdb.ExecuteTx(context.Background(), c.db, nil, func(tx *sql.Tx) (err error) { + aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName) + if err != nil { + return err } - }() - // If row exists at all, lock is present - locked := rows.Next() - if locked && !c.config.ForceLock { - c.isLocked.Store(false) - return database.ErrLocked - } + query := "SELECT * FROM " + c.config.LockTable + " WHERE lock_id = $1" + rows, err := tx.Query(query, aid) + if err != nil { + return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)} + } + defer func() { + if errClose := rows.Close(); errClose != nil { + err = multierror.Append(err, errClose) + } + }() + + // If row exists at all, lock is present + locked := rows.Next() + if locked && !c.config.ForceLock { + return database.ErrLocked + } - query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)" - if _, err := tx.Exec(query, aid); err != nil { - c.isLocked.Store(false) - return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)} - } + query = "INSERT INTO " + c.config.LockTable + " (lock_id) VALUES ($1)" + if _, err := tx.Exec(query, aid); err != nil { + return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)} + } - return nil + return nil + }) }) - - if err != nil { - c.isLocked.Store(false) - return err - } else { - return nil - } } // Locking is done manually with a separate lock table. Implementing advisory locks in CRDB is being discussed // See: https://github.com/cockroachdb/cockroach/issues/13546 func (c *CockroachDb) Unlock() error { - if !c.isLocked.CAS(true, false) { - return database.ErrNotLocked - } - - aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName) - if err != nil { - c.isLocked.Store(true) - return err - } + return database.CasRestoreOnErr(&c.isLocked, true, false, database.ErrNotLocked, func() (err error) { + aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName) + if err != nil { + return err + } - // In the event of an implementation (non-migration) error, it is possible for the lock to not be released. Until - // a better locking mechanism is added, a manual purging of the lock table may be required in such circumstances - query := "DELETE FROM " + c.config.LockTable + " WHERE lock_id = $1" - if _, err := c.db.Exec(query, aid); err != nil { - if e, ok := err.(*pq.Error); ok { - // 42P01 is "UndefinedTableError" in CockroachDB - // https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go - if e.Code == "42P01" { - // On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema - return nil + // In the event of an implementation (non-migration) error, it is possible for the lock to not be released. Until + // a better locking mechanism is added, a manual purging of the lock table may be required in such circumstances + query := "DELETE FROM " + c.config.LockTable + " WHERE lock_id = $1" + if _, err := c.db.Exec(query, aid); err != nil { + if e, ok := err.(*pq.Error); ok { + // 42P01 is "UndefinedTableError" in CockroachDB + // https://github.com/cockroachdb/cockroach/blob/master/pkg/sql/pgwire/pgerror/codes.go + if e.Code == "42P01" { + // On drops, the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema + return nil + } } - } - c.isLocked.Store(true) - return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)} - } + return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)} + } - return nil + return nil + }) } func (c *CockroachDb) Run(migration io.Reader) error { diff --git a/database/mysql/mysql.go b/database/mysql/mysql.go index 56840d06d..29bb9a276 100644 --- a/database/mysql/mysql.go +++ b/database/mysql/mysql.go @@ -252,63 +252,53 @@ func (m *Mysql) Close() error { } func (m *Mysql) Lock() error { - if !m.isLocked.CAS(false, true) { - return database.ErrLocked - } - - if m.config.NoLock { - return nil - } - - aid, err := database.GenerateAdvisoryLockId( - fmt.Sprintf("%s:%s", m.config.DatabaseName, m.config.MigrationsTable)) - if err != nil { - m.isLocked.Store(false) - return err - } + return database.CasRestoreOnErr(&m.isLocked, false, true, database.ErrLocked, func() error { + if m.config.NoLock { + return nil + } + aid, err := database.GenerateAdvisoryLockId( + fmt.Sprintf("%s:%s", m.config.DatabaseName, m.config.MigrationsTable)) + if err != nil { + return err + } - query := "SELECT GET_LOCK(?, 10)" - var success bool - if err := m.conn.QueryRowContext(context.Background(), query, aid).Scan(&success); err != nil { - m.isLocked.Store(false) - return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} - } + query := "SELECT GET_LOCK(?, 10)" + var success bool + if err := m.conn.QueryRowContext(context.Background(), query, aid).Scan(&success); err != nil { + return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} + } - if !success { - m.isLocked.Store(false) - return database.ErrLocked - } + if !success { + return database.ErrLocked + } - return nil + return nil + }) } func (m *Mysql) Unlock() error { - if !m.isLocked.CAS(true, false) { - return database.ErrNotLocked - } - - if m.config.NoLock { - return nil - } + return database.CasRestoreOnErr(&m.isLocked, true, false, database.ErrNotLocked, func() error { + if m.config.NoLock { + return nil + } - aid, err := database.GenerateAdvisoryLockId( - fmt.Sprintf("%s:%s", m.config.DatabaseName, m.config.MigrationsTable)) - if err != nil { - m.isLocked.Store(true) - return err - } + aid, err := database.GenerateAdvisoryLockId( + fmt.Sprintf("%s:%s", m.config.DatabaseName, m.config.MigrationsTable)) + if err != nil { + return err + } - query := `SELECT RELEASE_LOCK(?)` - if _, err := m.conn.ExecContext(context.Background(), query, aid); err != nil { - m.isLocked.Store(true) - return &database.Error{OrigErr: err, Query: []byte(query)} - } + query := `SELECT RELEASE_LOCK(?)` + if _, err := m.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } - // NOTE: RELEASE_LOCK could return NULL or (or 0 if the code is changed), - // in which case isLocked should be true until the timeout expires -- synchronizing - // these states is likely not worth trying to do; reconsider the necessity of isLocked. + // NOTE: RELEASE_LOCK could return NULL or (or 0 if the code is changed), + // in which case isLocked should be true until the timeout expires -- synchronizing + // these states is likely not worth trying to do; reconsider the necessity of isLocked. - return nil + return nil + }) } func (m *Mysql) Run(migration io.Reader) error { diff --git a/database/pgx/pgx.go b/database/pgx/pgx.go index 153f6a1ae..fe709ef04 100644 --- a/database/pgx/pgx.go +++ b/database/pgx/pgx.go @@ -221,42 +221,34 @@ func (p *Postgres) Close() error { // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS func (p *Postgres) Lock() error { - if !p.isLocked.CAS(false, true) { - return database.ErrLocked - } - - aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) - if err != nil { - p.isLocked.Store(false) - return err - } + return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) + if err != nil { + return err + } - // This will wait indefinitely until the lock can be acquired. - query := `SELECT pg_advisory_lock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { - p.isLocked.Store(false) - return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} - } - return nil + // This will wait indefinitely until the lock can be acquired. + query := `SELECT pg_advisory_lock($1)` + if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} + } + return nil + }) } func (p *Postgres) Unlock() error { - if !p.isLocked.CAS(true, false) { - return database.ErrNotLocked - } - - aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) - if err != nil { - p.isLocked.Store(true) - return err - } + return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) + if err != nil { + return err + } - query := `SELECT pg_advisory_unlock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { - p.isLocked.Store(true) - return &database.Error{OrigErr: err, Query: []byte(query)} - } - return nil + query := `SELECT pg_advisory_unlock($1)` + if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + return nil + }) } func (p *Postgres) Run(migration io.Reader) error { diff --git a/database/postgres/postgres.go b/database/postgres/postgres.go index 911f58ba4..d59bfb524 100644 --- a/database/postgres/postgres.go +++ b/database/postgres/postgres.go @@ -215,43 +215,35 @@ func (p *Postgres) Close() error { // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS func (p *Postgres) Lock() error { - if !p.isLocked.CAS(false, true) { - return database.ErrLocked - } - - aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) - if err != nil { - p.isLocked.Store(false) - return err - } + return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) + if err != nil { + return err + } - // This will wait indefinitely until the lock can be acquired. - query := `SELECT pg_advisory_lock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { - p.isLocked.Store(false) - return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} - } + // This will wait indefinitely until the lock can be acquired. + query := `SELECT pg_advisory_lock($1)` + if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} + } - return nil + return nil + }) } func (p *Postgres) Unlock() error { - if !p.isLocked.CAS(true, false) { - return database.ErrNotLocked - } - - aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) - if err != nil { - p.isLocked.Store(true) - return err - } + return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) + if err != nil { + return err + } - query := `SELECT pg_advisory_unlock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { - p.isLocked.Store(true) - return &database.Error{OrigErr: err, Query: []byte(query)} - } - return nil + query := `SELECT pg_advisory_unlock($1)` + if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + return nil + }) } func (p *Postgres) Run(migration io.Reader) error { diff --git a/database/sqlserver/sqlserver.go b/database/sqlserver/sqlserver.go index bb5f2f5dc..0f8252f3e 100644 --- a/database/sqlserver/sqlserver.go +++ b/database/sqlserver/sqlserver.go @@ -155,52 +155,44 @@ func (ss *SQLServer) Close() error { // Lock creates an advisory local on the database to prevent multiple migrations from running at the same time. func (ss *SQLServer) Lock() error { - if !ss.isLocked.CAS(false, true) { - return database.ErrLocked - } - - aid, err := database.GenerateAdvisoryLockId(ss.config.DatabaseName, ss.config.SchemaName) - if err != nil { - return err - } - - // This will either obtain the lock immediately and return true, - // or return false if the lock cannot be acquired immediately. - // MS Docs: sp_getapplock: https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-getapplock-transact-sql?view=sql-server-2017 - query := `EXEC sp_getapplock @Resource = @p1, @LockMode = 'Update', @LockOwner = 'Session', @LockTimeout = 0` + return database.CasRestoreOnErr(&ss.isLocked, false, true, database.ErrLocked, func() error { + aid, err := database.GenerateAdvisoryLockId(ss.config.DatabaseName, ss.config.SchemaName) + if err != nil { + return err + } - var status mssql.ReturnStatus - if _, err = ss.conn.ExecContext(context.Background(), query, aid, &status); err == nil && status > -1 { - return nil - } else if err != nil { - ss.isLocked.Store(false) - return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} - } else { - ss.isLocked.Store(false) - return &database.Error{Err: fmt.Sprintf("try lock failed with error %v: %v", status, lockErrorMap[status]), Query: []byte(query)} - } + // This will either obtain the lock immediately and return true, + // or return false if the lock cannot be acquired immediately. + // MS Docs: sp_getapplock: https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-getapplock-transact-sql?view=sql-server-2017 + query := `EXEC sp_getapplock @Resource = @p1, @LockMode = 'Update', @LockOwner = 'Session', @LockTimeout = 0` + + var status mssql.ReturnStatus + if _, err = ss.conn.ExecContext(context.Background(), query, aid, &status); err == nil && status > -1 { + return nil + } else if err != nil { + return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} + } else { + return &database.Error{Err: fmt.Sprintf("try lock failed with error %v: %v", status, lockErrorMap[status]), Query: []byte(query)} + } + }) } // Unlock froms the migration lock from the database func (ss *SQLServer) Unlock() error { - if !ss.isLocked.CAS(true, false) { - return database.ErrNotLocked - } - - aid, err := database.GenerateAdvisoryLockId(ss.config.DatabaseName, ss.config.SchemaName) - if err != nil { - ss.isLocked.Store(true) - return err - } + return database.CasRestoreOnErr(&ss.isLocked, true, false, database.ErrNotLocked, func() error { + aid, err := database.GenerateAdvisoryLockId(ss.config.DatabaseName, ss.config.SchemaName) + if err != nil { + return err + } - // MS Docs: sp_releaseapplock: https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-releaseapplock-transact-sql?view=sql-server-2017 - query := `EXEC sp_releaseapplock @Resource = @p1, @LockOwner = 'Session'` - if _, err := ss.conn.ExecContext(context.Background(), query, aid); err != nil { - ss.isLocked.Store(true) - return &database.Error{OrigErr: err, Query: []byte(query)} - } + // MS Docs: sp_releaseapplock: https://docs.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sp-releaseapplock-transact-sql?view=sql-server-2017 + query := `EXEC sp_releaseapplock @Resource = @p1, @LockOwner = 'Session'` + if _, err := ss.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } - return nil + return nil + }) } // Run the migrations for the database diff --git a/database/util.go b/database/util.go index 976ad3534..30d6a63b8 100644 --- a/database/util.go +++ b/database/util.go @@ -2,6 +2,7 @@ package database import ( "fmt" + "go.uber.org/atomic" "hash/crc32" "strings" ) @@ -17,3 +18,16 @@ func GenerateAdvisoryLockId(databaseName string, additionalNames ...string) (str sum = sum * uint32(advisoryLockIDSalt) return fmt.Sprint(sum), nil } + +// CasRestoreOnErr CAS wrapper to automatically restore the lock state on error +func CasRestoreOnErr(lock *atomic.Bool, o, n bool, casErr error, f func() error) error { + if !lock.CAS(o, n) { + return casErr + } + if err := f(); err != nil { + // Automatically unlock/lock on error + lock.Store(o) + return err + } + return nil +} \ No newline at end of file From 312d98d1171b17f3abb17e5b335b7715ced018af Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Wed, 16 Jun 2021 12:56:51 +0300 Subject: [PATCH 06/11] added test for cas with restore --- database/util_test.go | 62 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/database/util_test.go b/database/util_test.go index 13cba46d8..82d89b74c 100644 --- a/database/util_test.go +++ b/database/util_test.go @@ -1,6 +1,7 @@ package database import ( + "go.uber.org/atomic" "testing" ) @@ -45,3 +46,64 @@ func TestGenerateAdvisoryLockId(t *testing.T) { }) } } + +func TestCasRestoreOnErr(t *testing.T) { + testcases := []struct { + name string + lock *atomic.Bool + from bool + to bool + casErr error + fErr error + expectLock bool + expectError error + }{ + { + name: "Test positive CAS lock", + lock: atomic.NewBool(false), + from: false, + to: true, + casErr: ErrLocked, + fErr: nil, + expectError: nil, + expectLock: true, + + }, + { + name: "Test negative CAS lock", + lock: atomic.NewBool(true), + from: false, + to: true, + casErr: ErrLocked, + fErr: nil, + expectLock: true, + expectError: ErrLocked, + }, + { + name: "Test negative with callback lock", + lock: atomic.NewBool(false), + from: false, + to: true, + casErr: ErrLocked, + fErr: ErrNotLocked, + expectLock: false, + expectError: ErrNotLocked, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + err := CasRestoreOnErr(tc.lock, tc.from, tc.to, tc.casErr, func() error { + return tc.fErr + }) + + if tc.lock.Load() != tc.expectLock { + t.Error("Incorrect state of lock") + } + + if err != tc.expectError { + t.Error("Incorrect error value returned") + } + }) + } +} From 0b282a5fc78dce2ff3f38bccc9a9828bc8f5ff83 Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Wed, 16 Jun 2021 13:01:57 +0300 Subject: [PATCH 07/11] added atomic lock to sqlite & mongo --- database/mongodb/mongodb.go | 98 ++++++++++++++++++++----------------- database/sqlite/sqlite.go | 11 ++--- database/util.go | 2 +- 3 files changed, 58 insertions(+), 53 deletions(-) diff --git a/database/mongodb/mongodb.go b/database/mongodb/mongodb.go index 17ca804f2..7c02c7d99 100644 --- a/database/mongodb/mongodb.go +++ b/database/mongodb/mongodb.go @@ -10,6 +10,7 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" + "go.uber.org/atomic" "io" "io/ioutil" "net/url" @@ -40,9 +41,10 @@ var ( ) type Mongo struct { - client *mongo.Client - db *mongo.Database - config *Config + client *mongo.Client + db *mongo.Database + config *Config + isLocked atomic.Bool } type Locking struct { @@ -327,55 +329,59 @@ func (m *Mongo) ensureVersionTable() (err error) { // Utilizes advisory locking on the config.LockingCollection collection // This uses a unique index on the `locking_key` field. func (m *Mongo) Lock() error { - if !m.config.Locking.Enabled { - return nil - } - pid := os.Getpid() - hostname, err := os.Hostname() - if err != nil { - hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error()) - } - - newLockObj := lockObj{ - Key: lockKeyUniqueValue, - Pid: pid, - Hostname: hostname, - CreatedAt: time.Now(), - } - operation := func() error { - timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout) - _, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj) - defer cancelFunc() - return err - } - exponentialBackOff := backoff.NewExponentialBackOff() - duration := time.Duration(m.config.Locking.Timeout) * time.Second - exponentialBackOff.MaxElapsedTime = duration - exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second + return database.CasRestoreOnErr(&m.isLocked, false, true, database.ErrLocked, func() error { + if !m.config.Locking.Enabled { + return nil + } + pid := os.Getpid() + hostname, err := os.Hostname() + if err != nil { + hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error()) + } - err = backoff.Retry(operation, exponentialBackOff) - if err != nil { - return database.ErrLocked - } + newLockObj := lockObj{ + Key: lockKeyUniqueValue, + Pid: pid, + Hostname: hostname, + CreatedAt: time.Now(), + } + operation := func() error { + timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout) + _, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj) + defer cancelFunc() + return err + } + exponentialBackOff := backoff.NewExponentialBackOff() + duration := time.Duration(m.config.Locking.Timeout) * time.Second + exponentialBackOff.MaxElapsedTime = duration + exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second - return nil + err = backoff.Retry(operation, exponentialBackOff) + if err != nil { + return database.ErrLocked + } + return nil + }) } + func (m *Mongo) Unlock() error { - if !m.config.Locking.Enabled { - return nil - } + return database.CasRestoreOnErr(&m.isLocked, true, false, database.ErrNotLocked, func() error { + if !m.config.Locking.Enabled { + return nil + } - filter := findFilter{ - Key: lockKeyUniqueValue, - } + filter := findFilter{ + Key: lockKeyUniqueValue, + } - ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout) - _, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout) + _, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter) + defer cancel() - if err != nil { - return err - } - return nil + if err != nil { + return err + } + return nil + }) } diff --git a/database/sqlite/sqlite.go b/database/sqlite/sqlite.go index 581b87d28..d33c60e46 100644 --- a/database/sqlite/sqlite.go +++ b/database/sqlite/sqlite.go @@ -3,6 +3,7 @@ package sqlite import ( "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -34,7 +35,7 @@ type Config struct { type Sqlite struct { db *sql.DB - isLocked bool + isLocked atomic.Bool config *Config } @@ -177,18 +178,16 @@ func (m *Sqlite) Drop() (err error) { } func (m *Sqlite) Lock() error { - if m.isLocked { + if !m.isLocked.CAS(false, true) { return database.ErrLocked } - m.isLocked = true return nil } func (m *Sqlite) Unlock() error { - if !m.isLocked { - return nil + if !m.isLocked.CAS(true, false) { + return database.ErrNotLocked } - m.isLocked = false return nil } diff --git a/database/util.go b/database/util.go index 30d6a63b8..de66d5b80 100644 --- a/database/util.go +++ b/database/util.go @@ -30,4 +30,4 @@ func CasRestoreOnErr(lock *atomic.Bool, o, n bool, casErr error, f func() error) return err } return nil -} \ No newline at end of file +} From c4fd509f69b12a3ea959f47cf69d4ef3b302e7e0 Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Wed, 16 Jun 2021 13:01:57 +0300 Subject: [PATCH 08/11] added atomic lock to sqlite & mongo --- database/mongodb/mongodb.go | 98 ++++++++++++++++++++----------------- database/sqlite/sqlite.go | 11 ++--- database/util.go | 2 +- database/util_test.go | 57 +++++++++++---------- 4 files changed, 86 insertions(+), 82 deletions(-) diff --git a/database/mongodb/mongodb.go b/database/mongodb/mongodb.go index 17ca804f2..7c02c7d99 100644 --- a/database/mongodb/mongodb.go +++ b/database/mongodb/mongodb.go @@ -10,6 +10,7 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" + "go.uber.org/atomic" "io" "io/ioutil" "net/url" @@ -40,9 +41,10 @@ var ( ) type Mongo struct { - client *mongo.Client - db *mongo.Database - config *Config + client *mongo.Client + db *mongo.Database + config *Config + isLocked atomic.Bool } type Locking struct { @@ -327,55 +329,59 @@ func (m *Mongo) ensureVersionTable() (err error) { // Utilizes advisory locking on the config.LockingCollection collection // This uses a unique index on the `locking_key` field. func (m *Mongo) Lock() error { - if !m.config.Locking.Enabled { - return nil - } - pid := os.Getpid() - hostname, err := os.Hostname() - if err != nil { - hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error()) - } - - newLockObj := lockObj{ - Key: lockKeyUniqueValue, - Pid: pid, - Hostname: hostname, - CreatedAt: time.Now(), - } - operation := func() error { - timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout) - _, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj) - defer cancelFunc() - return err - } - exponentialBackOff := backoff.NewExponentialBackOff() - duration := time.Duration(m.config.Locking.Timeout) * time.Second - exponentialBackOff.MaxElapsedTime = duration - exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second + return database.CasRestoreOnErr(&m.isLocked, false, true, database.ErrLocked, func() error { + if !m.config.Locking.Enabled { + return nil + } + pid := os.Getpid() + hostname, err := os.Hostname() + if err != nil { + hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error()) + } - err = backoff.Retry(operation, exponentialBackOff) - if err != nil { - return database.ErrLocked - } + newLockObj := lockObj{ + Key: lockKeyUniqueValue, + Pid: pid, + Hostname: hostname, + CreatedAt: time.Now(), + } + operation := func() error { + timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout) + _, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj) + defer cancelFunc() + return err + } + exponentialBackOff := backoff.NewExponentialBackOff() + duration := time.Duration(m.config.Locking.Timeout) * time.Second + exponentialBackOff.MaxElapsedTime = duration + exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second - return nil + err = backoff.Retry(operation, exponentialBackOff) + if err != nil { + return database.ErrLocked + } + return nil + }) } + func (m *Mongo) Unlock() error { - if !m.config.Locking.Enabled { - return nil - } + return database.CasRestoreOnErr(&m.isLocked, true, false, database.ErrNotLocked, func() error { + if !m.config.Locking.Enabled { + return nil + } - filter := findFilter{ - Key: lockKeyUniqueValue, - } + filter := findFilter{ + Key: lockKeyUniqueValue, + } - ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout) - _, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter) - defer cancel() + ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout) + _, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter) + defer cancel() - if err != nil { - return err - } - return nil + if err != nil { + return err + } + return nil + }) } diff --git a/database/sqlite/sqlite.go b/database/sqlite/sqlite.go index 581b87d28..d33c60e46 100644 --- a/database/sqlite/sqlite.go +++ b/database/sqlite/sqlite.go @@ -3,6 +3,7 @@ package sqlite import ( "database/sql" "fmt" + "go.uber.org/atomic" "io" "io/ioutil" nurl "net/url" @@ -34,7 +35,7 @@ type Config struct { type Sqlite struct { db *sql.DB - isLocked bool + isLocked atomic.Bool config *Config } @@ -177,18 +178,16 @@ func (m *Sqlite) Drop() (err error) { } func (m *Sqlite) Lock() error { - if m.isLocked { + if !m.isLocked.CAS(false, true) { return database.ErrLocked } - m.isLocked = true return nil } func (m *Sqlite) Unlock() error { - if !m.isLocked { - return nil + if !m.isLocked.CAS(true, false) { + return database.ErrNotLocked } - m.isLocked = false return nil } diff --git a/database/util.go b/database/util.go index 30d6a63b8..de66d5b80 100644 --- a/database/util.go +++ b/database/util.go @@ -30,4 +30,4 @@ func CasRestoreOnErr(lock *atomic.Bool, o, n bool, casErr error, f func() error) return err } return nil -} \ No newline at end of file +} diff --git a/database/util_test.go b/database/util_test.go index 82d89b74c..6fb8fbb96 100644 --- a/database/util_test.go +++ b/database/util_test.go @@ -49,44 +49,43 @@ func TestGenerateAdvisoryLockId(t *testing.T) { func TestCasRestoreOnErr(t *testing.T) { testcases := []struct { - name string - lock *atomic.Bool - from bool - to bool - casErr error - fErr error - expectLock bool + name string + lock *atomic.Bool + from bool + to bool + casErr error + fErr error + expectLock bool expectError error }{ { - name: "Test positive CAS lock", - lock: atomic.NewBool(false), - from: false, - to: true, - casErr: ErrLocked, - fErr: nil, + name: "Test positive CAS lock", + lock: atomic.NewBool(false), + from: false, + to: true, + casErr: ErrLocked, + fErr: nil, expectError: nil, - expectLock: true, - + expectLock: true, }, { - name: "Test negative CAS lock", - lock: atomic.NewBool(true), - from: false, - to: true, - casErr: ErrLocked, - fErr: nil, - expectLock: true, + name: "Test negative CAS lock", + lock: atomic.NewBool(true), + from: false, + to: true, + casErr: ErrLocked, + fErr: nil, + expectLock: true, expectError: ErrLocked, }, { - name: "Test negative with callback lock", - lock: atomic.NewBool(false), - from: false, - to: true, - casErr: ErrLocked, - fErr: ErrNotLocked, - expectLock: false, + name: "Test negative with callback lock", + lock: atomic.NewBool(false), + from: false, + to: true, + casErr: ErrLocked, + fErr: ErrNotLocked, + expectLock: false, expectError: ErrNotLocked, }, } From 71bfde660f4cd9903117a330dcb083e8c144f987 Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Wed, 16 Jun 2021 23:35:47 +0300 Subject: [PATCH 09/11] fix/refactor test cas restore function --- database/util_test.go | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/database/util_test.go b/database/util_test.go index 6fb8fbb96..bc00b4777 100644 --- a/database/util_test.go +++ b/database/util_test.go @@ -1,6 +1,7 @@ package database import ( + "errors" "go.uber.org/atomic" "testing" ) @@ -48,12 +49,14 @@ func TestGenerateAdvisoryLockId(t *testing.T) { } func TestCasRestoreOnErr(t *testing.T) { + casErr := errors.New("test lock CAS failure") + fErr := errors.New("test callback error") + testcases := []struct { name string lock *atomic.Bool from bool to bool - casErr error fErr error expectLock bool expectError error @@ -63,46 +66,41 @@ func TestCasRestoreOnErr(t *testing.T) { lock: atomic.NewBool(false), from: false, to: true, - casErr: ErrLocked, fErr: nil, - expectError: nil, expectLock: true, + expectError: nil, }, { name: "Test negative CAS lock", lock: atomic.NewBool(true), from: false, to: true, - casErr: ErrLocked, fErr: nil, expectLock: true, - expectError: ErrLocked, + expectError: casErr, }, { name: "Test negative with callback lock", lock: atomic.NewBool(false), from: false, to: true, - casErr: ErrLocked, - fErr: ErrNotLocked, + fErr: fErr, expectLock: false, - expectError: ErrNotLocked, + expectError: fErr, }, } for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - err := CasRestoreOnErr(tc.lock, tc.from, tc.to, tc.casErr, func() error { + if err := CasRestoreOnErr(tc.lock, tc.from, tc.to, casErr, func() error { return tc.fErr - }) + }); err != tc.expectError { + t.Error("Incorrect error value returned") + } if tc.lock.Load() != tc.expectLock { t.Error("Incorrect state of lock") } - - if err != tc.expectError { - t.Error("Incorrect error value returned") - } }) } } From 93fac42d8c9a294dc49dd3b741f564ac0abd73e9 Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Thu, 17 Jun 2021 11:52:08 +0300 Subject: [PATCH 10/11] disable lock for mongo --- database/mongodb/mongodb.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/database/mongodb/mongodb.go b/database/mongodb/mongodb.go index 7c02c7d99..25d57555c 100644 --- a/database/mongodb/mongodb.go +++ b/database/mongodb/mongodb.go @@ -329,10 +329,11 @@ func (m *Mongo) ensureVersionTable() (err error) { // Utilizes advisory locking on the config.LockingCollection collection // This uses a unique index on the `locking_key` field. func (m *Mongo) Lock() error { + if !m.config.Locking.Enabled { + return nil + } + return database.CasRestoreOnErr(&m.isLocked, false, true, database.ErrLocked, func() error { - if !m.config.Locking.Enabled { - return nil - } pid := os.Getpid() hostname, err := os.Hostname() if err != nil { @@ -366,11 +367,11 @@ func (m *Mongo) Lock() error { } func (m *Mongo) Unlock() error { - return database.CasRestoreOnErr(&m.isLocked, true, false, database.ErrNotLocked, func() error { - if !m.config.Locking.Enabled { - return nil - } + if !m.config.Locking.Enabled { + return nil + } + return database.CasRestoreOnErr(&m.isLocked, true, false, database.ErrNotLocked, func() error { filter := findFilter{ Key: lockKeyUniqueValue, } From 11e2c9d5826a949c976d73464058ca6900796114 Mon Sep 17 00:00:00 2001 From: Alexey Prinkov Date: Wed, 23 Jun 2021 23:58:14 +0300 Subject: [PATCH 11/11] mongo local lock before acquiring the db lock --- database/mongodb/mongodb.go | 16 ++++++++-------- database/mongodb/mongodb_test.go | 13 +------------ 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/database/mongodb/mongodb.go b/database/mongodb/mongodb.go index 25d57555c..3e18fd44b 100644 --- a/database/mongodb/mongodb.go +++ b/database/mongodb/mongodb.go @@ -329,11 +329,11 @@ func (m *Mongo) ensureVersionTable() (err error) { // Utilizes advisory locking on the config.LockingCollection collection // This uses a unique index on the `locking_key` field. func (m *Mongo) Lock() error { - if !m.config.Locking.Enabled { - return nil - } - return database.CasRestoreOnErr(&m.isLocked, false, true, database.ErrLocked, func() error { + if !m.config.Locking.Enabled { + return nil + } + pid := os.Getpid() hostname, err := os.Hostname() if err != nil { @@ -367,11 +367,11 @@ func (m *Mongo) Lock() error { } func (m *Mongo) Unlock() error { - if !m.config.Locking.Enabled { - return nil - } - return database.CasRestoreOnErr(&m.isLocked, true, false, database.ErrNotLocked, func() error { + if !m.config.Locking.Enabled { + return nil + } + filter := findFilter{ Key: lockKeyUniqueValue, } diff --git a/database/mongodb/mongodb_test.go b/database/mongodb/mongodb_test.go index c73da46c4..f15f74113 100644 --- a/database/mongodb/mongodb_test.go +++ b/database/mongodb/mongodb_test.go @@ -221,18 +221,7 @@ func TestLockWorks(t *testing.T) { t.Fatal(err) } - // disable locking, validate wer can lock twice - mc.config.Locking.Enabled = false - err = mc.Lock() - if err != nil { - t.Fatal(err) - } - err = mc.Lock() - if err != nil { - t.Fatal(err) - } - - // re-enable locking, + // enable locking, //try to hit a lock conflict mc.config.Locking.Enabled = true mc.config.Locking.Timeout = 1