Skip to content

Commit

Permalink
Implement PR feedback
Browse files Browse the repository at this point in the history
rename constants
rename query params
add new timeout interval query param
add new feature flag
use timeout context
  • Loading branch information
andyN42 committed Sep 24, 2020
1 parent 7208a16 commit 73760ed
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 41 deletions.
4 changes: 3 additions & 1 deletion database/mongodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
|------------|---------------------|-------------|
| `x-migrations-collection` | `MigrationsCollection` | Name of the migrations collection |
| `x-transaction-mode` | `TransactionMode` | If set to `true` wrap commands in [transaction](https://docs.mongodb.com/manual/core/transactions). Available only for replica set. Driver is using [strconv.ParseBool](https://golang.org/pkg/strconv/#ParseBool) for parsing|
| `x-advisory-locking` | `true` | Feature flag for advisory locking, if set to false, disable advisory locking |
| `x-advisory-lock-collection` | `migrate_advisory_lock` | The name of the collection to use for advisory locking |
| `x-advisory-lock-backoff-seconds` | `15` | The max time that the advisory lock will wait during exponential backoff if the db is already locked. |
| `x-advisory-lock-timout` | `15` | The max time in seconds that the advisory lock will wait if the db is already locked. |
| `x-advisory-lock-timout-interval` | `10` | The max timeout in seconds interval that the advisory lock will wait if the db is already locked. |
| `dbname` | `DatabaseName` | The name of the database to connect to |
| `user` | | The user to sign in as. Can be omitted |
| `password` | | The user's password. Can be omitted |
Expand Down
105 changes: 65 additions & 40 deletions database/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"go.mongodb.org/mongo-driver/x/mongo/driver/connstring"
"io"
"io/ioutil"
Expand All @@ -28,9 +27,12 @@ func init() {
var DefaultMigrationsCollection = "schema_migrations"

const DefaultLockingCollection = "migrate_advisory_lock" // the collection to use for advisory locking by default.
const LockingKey = "locking_key" // the key to lock on, will have a unique=true index on it
const lockKeyUniqueValue = 0 // the unique value to lock on. If multiple clients try to insert the same key, it will fail (locked).
const LockingBackoffTime = 15 // the default maximum time to wait for a lock to be released
const DefaultLockTimeout = 15 // the default maximum time to wait for a lock to be released.
const DefaultLockTimeoutInterval = 10 // the default maximum intervals time for the locking timout.
const DefaultAdvisoryLockingFlag = true // the default value for the advisory locking feature flag. Default is true.
const LockIndexName = "lock_unique_key" // the name of the index which adds unique constraint to the locking_key field.
const contextWaitTimeout = 5 * time.Second // how long to wait for the request to mongo to block/wait for.

var (
ErrNoDatabaseName = fmt.Errorf("no database name")
Expand All @@ -43,23 +45,28 @@ type Mongo struct {
config *Config
}

type Locking struct {
CollectionName string
Timeout int
UseAdvisoryLocking bool
Interval int
}
type Config struct {
DatabaseName string
MigrationsCollection string
LockingCollection string
LockingBackoffTime int
TransactionMode bool
Locking Locking
}

type versionInfo struct {
Version int `bson:"version"`
Dirty bool `bson:"dirty"`
}

type lockObj struct {
Key int `bson:"locking_key"`
Pid int `bson:"pid"`
Name string `bson:"hostname"`
Key int `bson:"locking_key"`
Pid int `bson:"pid"`
Hostname string `bson:"hostname"`
CreatedAt time.Time `bson:"created_at"`
}
type findFilter struct {
Key int `bson:"locking_key"`
Expand All @@ -75,11 +82,14 @@ func WithInstance(instance *mongo.Client, config *Config) (database.Driver, erro
if len(config.MigrationsCollection) == 0 {
config.MigrationsCollection = DefaultMigrationsCollection
}
if len(config.LockingCollection) == 0 {
config.LockingCollection = DefaultLockingCollection
if len(config.Locking.CollectionName) == 0 {
config.Locking.CollectionName = DefaultLockingCollection
}
if config.Locking.Timeout <= 0 {
config.Locking.Timeout = DefaultLockTimeout
}
if config.LockingBackoffTime <= 0 {
config.LockingBackoffTime = LockingBackoffTime
if config.Locking.Interval <= 0 {
config.Locking.Interval = DefaultLockTimeoutInterval
}

mc := &Mongo{
Expand All @@ -88,8 +98,10 @@ func WithInstance(instance *mongo.Client, config *Config) (database.Driver, erro
config: config,
}

if err := mc.ensureLockTable(); err != nil {
return nil, err
if mc.config.Locking.UseAdvisoryLocking {
if err := mc.ensureLockTable(); err != nil {
return nil, err
}
}
if err := mc.ensureVersionTable(); err != nil {
return nil, err
Expand All @@ -111,22 +123,32 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) {

migrationsCollection := unknown.Get("x-migrations-collection")
lockCollection := unknown.Get("x-advisory-lock-collection")
lockingBackoffTime, _ := strconv.Atoi(unknown.Get("x-advisory-lock-backoff-seconds"))
transactionMode, _ := strconv.ParseBool(unknown.Get("x-transaction-mode"))
advisoryLockingFlag, err := strconv.ParseBool(unknown.Get("x-advisory-locking"))
if err != nil {
advisoryLockingFlag = DefaultAdvisoryLockingFlag
}
lockingTimout, _ := strconv.Atoi(unknown.Get("x-advisory-lock-timeout"))
maxLockingIntervals, _ := strconv.Atoi(unknown.Get("x-advisory-lock-timout-interval"))

client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(dsn))
if err != nil {
return nil, err
}

if err = client.Ping(context.TODO(), nil); err != nil {
return nil, err
}
mc, err := WithInstance(client, &Config{
DatabaseName: uri.Database,
MigrationsCollection: migrationsCollection,
LockingCollection: lockCollection,
LockingBackoffTime: lockingBackoffTime,
TransactionMode: transactionMode,
Locking: Locking{
CollectionName: lockCollection,
Timeout: lockingTimout,
UseAdvisoryLocking: advisoryLockingFlag,
Interval: maxLockingIntervals,
},
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -220,24 +242,13 @@ func (m *Mongo) Drop() error {
return m.db.Drop(context.TODO())
}

// Note that this could possibly have a race condition
// if three migrate processes try to create the index at the exact same time, but it
// takes a while for the first call to build the index (although it's an empty collection, so that may not take long)
// then two of them could return successful, while the index is still building, leading to
// the second and third processes to successfully insert a document (and "acquire" the lock),
// as duplicate keys would be allowed.
//
// This may not be an issue, if the collection is empty, and creating the lock takes next to no time.
//
func (m *Mongo) ensureLockTable() error {
indexes := m.db.Collection(m.config.LockingCollection).Indexes()
indexOptions := options.Index().SetUnique(true).SetName("lock_unique_key")
indexKeys := bsonx.MDoc{
LockingKey: bsonx.Int32(-1),
}
indexes := m.db.Collection(m.config.Locking.CollectionName).Indexes()

indexOptions := options.Index().SetUnique(true).SetName(LockIndexName)
_, err := indexes.CreateOne(context.TODO(), mongo.IndexModel{
Options: indexOptions,
Keys: indexKeys,
Keys: findFilter{Key: -1},
})
if err != nil {
return err
Expand Down Expand Up @@ -275,39 +286,53 @@ func (m *Mongo) ensureVersionTable() (err error) {
// Utilizes advisory locking on the config.LockingCollection collection
// This uses a unique index on the `locking_key` field.
func (m *Mongo) Lock() error {
if !m.config.Locking.UseAdvisoryLocking {
return nil
}
pid := os.Getpid()
hostname, err := os.Hostname()
if err != nil {
hostname = fmt.Sprintf("Could not determine hostname. Error: %s", err.Error())
}

newLockObj := lockObj{
Key: lockKeyUniqueValue,
Pid: pid,
Name: hostname,
Key: lockKeyUniqueValue,
Pid: pid,
Hostname: hostname,
CreatedAt: time.Now(),
}
operation := func() error {
_, err := m.db.Collection(m.config.LockingCollection).InsertOne(context.TODO(), newLockObj)
timeout, cancelFunc := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).InsertOne(timeout, newLockObj)
defer cancelFunc()
return err
}
exponentialBackOff := backoff.NewExponentialBackOff()
duration := time.Duration(m.config.LockingBackoffTime) * time.Second
duration := time.Duration(m.config.Locking.Timeout) * time.Second
exponentialBackOff.MaxElapsedTime = duration
exponentialBackOff.MaxInterval = exponentialBackOff.MaxElapsedTime / 10
exponentialBackOff.MaxInterval = time.Duration(m.config.Locking.Interval) * time.Second

err = backoff.Retry(operation, exponentialBackOff)
if err != nil {
return database.ErrLocked
}

return nil

}
func (m *Mongo) Unlock() error {
if !m.config.Locking.UseAdvisoryLocking {
return nil
}

filter := findFilter{
Key: lockKeyUniqueValue,
}
_, err := m.db.Collection(m.config.LockingCollection).DeleteMany(context.TODO(), filter)

ctx, cancel := context.WithTimeout(context.Background(), contextWaitTimeout)
_, err := m.db.Collection(m.config.Locking.CollectionName).DeleteMany(ctx, filter)
defer cancel()

if err != nil {
return err
}
Expand Down
14 changes: 14 additions & 0 deletions database/mongodb/mongodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,21 @@ func TestLockWorks(t *testing.T) {
t.Fatal(err)
}

// disable locking, validate wer can lock twice
mc.config.Locking.UseAdvisoryLocking = false
err = mc.Lock()
if err != nil {
t.Fatal(err)
}
err = mc.Lock()
if err != nil {
t.Fatal(err)
}

// re-enable locking,
//try to hit a lock conflict
mc.config.Locking.UseAdvisoryLocking = true
mc.config.Locking.Timeout = 1
err = mc.Lock()
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 73760ed

Please sign in to comment.