Skip to content

Commit

Permalink
feat: allow Go migrations outside of an SQL transaction (#696)
Browse files Browse the repository at this point in the history
Co-authored-by: aeneasr <[email protected]>
  • Loading branch information
alnr and aeneasr authored Jun 26, 2023
1 parent 0d30de8 commit 1c62a8a
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 38 deletions.
8 changes: 8 additions & 0 deletions popx/migration_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,13 @@ func (fm *MigrationBox) check() error {
return errors.Errorf("migration %s has no corresponding down migration", up.Version)
}
}

for _, m := range fm.Migrations {
for _, n := range m {
if err := n.Valid(); err != nil {
return err
}
}
}
return nil
}
132 changes: 132 additions & 0 deletions popx/migration_box_gomigration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package popx_test
import (
"context"
"database/sql"
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -160,3 +161,134 @@ func TestGoMigrations(t *testing.T) {
assert.ErrorIs(t, c.Where("i=1").First(tt), sql.ErrNoRows, "%+v", tt)
})
}

func TestIncompatibleRunners(t *testing.T) {
mb, err := popx.NewMigrationBox(empty, popx.NewMigrator(nil, logrusx.New("", ""), nil, 0), popx.WithGoMigrations(
popx.Migrations{
{
Path: "transactional",
Version: "1",
Name: "gomigration_tx",
Direction: "up",
Type: "go",
DBType: "all",
RunnerNoTx: func(m popx.Migration, c *pop.Connection) error {
return nil
},
Runner: func(m popx.Migration, c *pop.Connection, tx *pop.Tx) error {
return nil
},
},
{
Path: "transactional",
Version: "1",
Name: "gomigration_tx",
Direction: "down",
Type: "go",
DBType: "all",
RunnerNoTx: func(m popx.Migration, c *pop.Connection) error {
return nil
},
},
}))
require.ErrorContains(t, err, "incompatible transaction and non-transaction runners defined")
require.Nil(t, mb)

mb, err = popx.NewMigrationBox(empty, popx.NewMigrator(nil, logrusx.New("", ""), nil, 0), popx.WithGoMigrations(
popx.Migrations{
{
Path: "transactional",
Version: "1",
Name: "gomigration_tx",
Direction: "up",
Type: "go",
DBType: "all",
RunnerNoTx: nil,
Runner: nil,
},
{
Path: "transactional",
Version: "1",
Name: "gomigration_tx",
Direction: "down",
Type: "go",
DBType: "all",
RunnerNoTx: nil,
Runner: nil,
},
}))
require.ErrorContains(t, err, "no runner defined")
require.Nil(t, mb)
}

func TestNoTransaction(t *testing.T) {
c, err := pop.NewConnection(&pop.ConnectionDetails{
URL: "sqlite://file::memory:",
})
require.NoError(t, err)
require.NoError(t, c.Open())

require.NoError(t, c.RawQuery("CREATE TABLE tests (i INTEGER, j INTEGER)").Exec())

up1, up2 := make(chan struct{}), make(chan struct{})
down1, down2 := make(chan struct{}), make(chan struct{})
rnd := rand.NewSource(time.Now().Unix())
i1, i2, j1, j2 := rnd.Int63(), rnd.Int63(), rnd.Int63(), rnd.Int63()
mb, err := popx.NewMigrationBox(empty, popx.NewMigrator(c, logrusx.New("", ""), nil, 0), popx.WithGoMigrations(
popx.Migrations{
{
Path: "gomigration_notx",
Version: "1",
Name: "gomigration no transaction",
Direction: "up",
Type: "go",
DBType: "all",
RunnerNoTx: func(m popx.Migration, c *pop.Connection) error {
if _, err := c.Store.Exec("INSERT INTO tests (i, j) VALUES (?, ?)", i1, j1); err != nil {
return errors.WithStack(err)
}
close(up1)
<-up2
return nil
},
},
{
Path: "gomigration_notx",
Version: "1",
Name: "gomigration no transaction",
Direction: "down",
Type: "go",
DBType: "all",
RunnerNoTx: func(m popx.Migration, c *pop.Connection) error {
if _, err := c.Store.Exec("INSERT INTO tests (i, j) VALUES (?, ?)", i2, j2); err != nil {
return errors.WithStack(err)
}
close(down1)
<-down2
return nil
},
},
},
))
require.NoError(t, err)
errs := make(chan error, 10)
go func() {
errs <- mb.Up(context.Background())
}()
<-up1
var j int64
require.NoError(t, c.Store.Get(&j, "SELECT j FROM tests WHERE i = ?", i1))
assert.Equal(t, j1, j)
close(up2)
assert.NoError(t, <-errs)

go func() {
errs <- mb.Down(context.Background(), 20)
}()
<-down1
j = 0
require.NoError(t, c.Store.Get(&j, "SELECT j FROM tests WHERE i = ?", i2))
assert.Equal(t, j2, j)
close(down2)
assert.NoError(t, <-errs)
}
26 changes: 16 additions & 10 deletions popx/migration_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
package popx

import (
"fmt"
"sort"

"github.com/pkg/errors"

"github.com/gobuffalo/pop/v6"
)

Expand All @@ -18,23 +19,28 @@ type Migration struct {
Version string
// Name of the migration (create_widgets)
Name string
// Direction of the migration (up)
// Direction of the migration (up|down)
Direction string
// Type of migration (sql)
// Type of migration (sql|go)
Type string
// DB type (all|postgres|mysql...)
DBType string
// Runner function to run/execute the migration
// Runner function to run/execute the migration. Will be wrapped in a
// database transaction. Mutually exclusive with RunnerNoTx
Runner func(Migration, *pop.Connection, *pop.Tx) error
// RunnerNoTx function to run/execute the migration. NOT wrapped in a
// database transaction. Mutually exclusive with Runner.
RunnerNoTx func(Migration, *pop.Connection) error
}

// Run the migration. Returns an error if there is
// no mf.Runner defined.
func (mf Migration) Run(c *pop.Connection, tx *pop.Tx) error {
if mf.Runner == nil {
return fmt.Errorf("no runner defined for %s", mf.Path)
func (m Migration) Valid() error {
if m.Runner == nil && m.RunnerNoTx == nil {
return errors.Errorf("no runner defined for %s", m.Path)
}
if m.Runner != nil && m.RunnerNoTx != nil {
return errors.Errorf("incompatible transaction and non-transaction runners defined for %s", m.Path)
}
return mf.Runner(mf, c, tx)
return nil
}

// Migrations is a collection of Migration
Expand Down
84 changes: 56 additions & 28 deletions popx/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,20 @@ func (m *Migrator) UpTo(ctx context.Context, step int) (applied int, err error)
mtn := m.sanitizedMigrationTableName(c)
mfs := m.Migrations["up"].SortAndFilter(c.Dialect.Name())
for _, mi := range mfs {
l := m.l.WithField("version", mi.Version).WithField("migration_name", mi.Name).WithField("migration_file", mi.Path)

exists, err := c.Where("version = ?", mi.Version).Exists(mtn)
if err != nil {
return errors.Wrapf(err, "problem checking for migration version %s", mi.Version)
}

if exists {
m.l.WithField("version", mi.Version).Debug("Migration has already been applied, skipping.")
l.Debug("Migration has already been applied, skipping.")
continue
}

if len(mi.Version) > 14 {
m.l.WithField("version", mi.Version).Debug("Migration has not been applied but it might be a legacy migration, investigating.")
l.Debug("Migration has not been applied but it might be a legacy migration, investigating.")

legacyVersion := mi.Version[:14]
exists, err = c.Where("version = ?", legacyVersion).Exists(mtn)
Expand All @@ -121,7 +123,7 @@ func (m *Migrator) UpTo(ctx context.Context, step int) (applied int, err error)
}

if exists {
m.l.WithField("version", mi.Version).WithField("legacy_version", legacyVersion).WithField("migration_table", mtn).Debug("Migration has already been applied in a legacy migration run. Updating version in migration table.")
l.WithField("legacy_version", legacyVersion).WithField("migration_table", mtn).Debug("Migration has already been applied in a legacy migration run. Updating version in migration table.")
if err := m.isolatedTransaction(ctx, "init-migrate", func(conn *pop.Connection) error {
// We do not want to remove the legacy migration version or subsequent migrations might be applied twice.
//
Expand All @@ -141,23 +143,40 @@ func (m *Migrator) UpTo(ctx context.Context, step int) (applied int, err error)
}
}

m.l.WithField("version", mi.Version).Debug("Migration has not yet been applied, running migration.")
l.Info("Migration has not yet been applied, running migration.")

if err := mi.Valid(); err != nil {
return err
}

if mi.Runner != nil {
err := m.isolatedTransaction(ctx, "up", func(conn *pop.Connection) error {
if err := mi.Runner(mi, conn, conn.TX); err != nil {
return err
}

if err = m.isolatedTransaction(ctx, "up", func(conn *pop.Connection) error {
if err := mi.Run(conn, conn.TX); err != nil {
// #nosec G201 - mtn is a system-wide const
if err := conn.RawQuery(fmt.Sprintf("INSERT INTO %s (version) VALUES (?)", mtn), mi.Version).Exec(); err != nil {
return errors.Wrapf(err, "problem inserting migration version %s", mi.Version)
}
return nil
})
if err != nil {
return err
}
} else {
l.Warn("Migration has requested running outside a transaction. Proceed with caution.")
if err := mi.RunnerNoTx(mi, c); err != nil {
return err
}

// #nosec G201 - mtn is a system-wide const
if _, err = conn.TX.Exec(fmt.Sprintf("INSERT INTO %s (version) VALUES ('%s')", mtn, mi.Version)); err != nil {
return errors.Wrapf(err, "problem inserting migration version %s", mi.Version)
if err := c.RawQuery(fmt.Sprintf("INSERT INTO %s (version) VALUES (?)", mtn), mi.Version).Exec(); err != nil {
return errors.Wrapf(err, "problem inserting migration version %s. YOUR DATABASE MAY BE IN AN INCONSISTENT STATE! MANUAL INTERVENTION REQUIRED!", mi.Version)
}
return nil
}); err != nil {
return err
}

m.l.Debugf("> %s", mi.Name)
l.Infof("> %s applied successfully", mi.Name)
applied++
if step > 0 && applied >= step {
break
Expand Down Expand Up @@ -215,21 +234,37 @@ func (m *Migrator) Down(ctx context.Context, step int) error {
return errors.Errorf("migration version %s does not exist", mi.Version)
}

err = m.isolatedTransaction(ctx, "down", func(conn *pop.Connection) error {
err := mi.Run(conn, conn.TX)
if err := mi.Valid(); err != nil {
return err
}

if mi.Runner != nil {
err := m.isolatedTransaction(ctx, "down", func(conn *pop.Connection) error {
err := mi.Runner(mi, conn, conn.TX)
if err != nil {
return err
}

// #nosec G201 - mtn is a system-wide const
if err := conn.RawQuery(fmt.Sprintf("DELETE FROM %s WHERE version = ?", mtn), mi.Version).Exec(); err != nil {
return errors.Wrapf(err, "problem deleting migration version %s", mi.Version)
}

return nil
})
if err != nil {
return err
}
} else {
err := mi.RunnerNoTx(mi, c)
if err != nil {
return err
}

// #nosec G201 - mtn is a system-wide const
if err = conn.RawQuery(fmt.Sprintf("DELETE FROM %s WHERE version = ?", mtn), mi.Version).Exec(); err != nil {
return errors.Wrapf(err, "problem deleting migration version %s", mi.Version)
if err := c.RawQuery(fmt.Sprintf("DELETE FROM %s WHERE version = ?", mtn), mi.Version).Exec(); err != nil {
return errors.Wrapf(err, "problem deleting migration version %s. YOUR DATABASE MAY BE IN AN INCONSISTENT STATE! MANUAL INTERVENTION REQUIRED!", mi.Version)
}

return nil
})
if err != nil {
return err
}

m.l.Debugf("< %s", mi.Name)
Expand Down Expand Up @@ -512,13 +547,6 @@ func (m *Migrator) DumpMigrationSchema(ctx context.Context) error {
return nil
}

func (m *Migrator) wrapSpan(ctx context.Context, opName string, f func(ctx context.Context, span trace.Span) error) error {
span, ctx := m.startSpan(ctx, opName)
defer span.End()

return f(ctx, span)
}

func (m *Migrator) startSpan(ctx context.Context, opName string) (trace.Span, context.Context) {
tracer := otel.Tracer(tracingComponent)
if m.tracer.IsLoaded() {
Expand Down

0 comments on commit 1c62a8a

Please sign in to comment.