From 1c62a8a22f61052675b5cefe1ad45cb654677eaa Mon Sep 17 00:00:00 2001 From: Arne Luenser Date: Mon, 26 Jun 2023 11:37:58 +0200 Subject: [PATCH] feat: allow Go migrations outside of an SQL transaction (#696) Co-authored-by: aeneasr <3372410+aeneasr@users.noreply.github.com> --- popx/migration_box.go | 8 ++ popx/migration_box_gomigration_test.go | 132 +++++++++++++++++++++++++ popx/migration_info.go | 26 +++-- popx/migrator.go | 84 ++++++++++------ 4 files changed, 212 insertions(+), 38 deletions(-) diff --git a/popx/migration_box.go b/popx/migration_box.go index 740e3a49..f5919f6d 100644 --- a/popx/migration_box.go +++ b/popx/migration_box.go @@ -249,5 +249,13 @@ func (fm *MigrationBox) check() error { return errors.Errorf("migration %s has no corresponding down migration", up.Version) } } + + for _, m := range fm.Migrations { + for _, n := range m { + if err := n.Valid(); err != nil { + return err + } + } + } return nil } diff --git a/popx/migration_box_gomigration_test.go b/popx/migration_box_gomigration_test.go index cbe5470d..b685c6f0 100644 --- a/popx/migration_box_gomigration_test.go +++ b/popx/migration_box_gomigration_test.go @@ -6,6 +6,7 @@ package popx_test import ( "context" "database/sql" + "math/rand" "testing" "time" @@ -160,3 +161,134 @@ func TestGoMigrations(t *testing.T) { assert.ErrorIs(t, c.Where("i=1").First(tt), sql.ErrNoRows, "%+v", tt) }) } + +func TestIncompatibleRunners(t *testing.T) { + mb, err := popx.NewMigrationBox(empty, popx.NewMigrator(nil, logrusx.New("", ""), nil, 0), popx.WithGoMigrations( + popx.Migrations{ + { + Path: "transactional", + Version: "1", + Name: "gomigration_tx", + Direction: "up", + Type: "go", + DBType: "all", + RunnerNoTx: func(m popx.Migration, c *pop.Connection) error { + return nil + }, + Runner: func(m popx.Migration, c *pop.Connection, tx *pop.Tx) error { + return nil + }, + }, + { + Path: "transactional", + Version: "1", + Name: "gomigration_tx", + Direction: "down", + Type: "go", + DBType: "all", + RunnerNoTx: func(m popx.Migration, c *pop.Connection) error { + return nil + }, + }, + })) + require.ErrorContains(t, err, "incompatible transaction and non-transaction runners defined") + require.Nil(t, mb) + + mb, err = popx.NewMigrationBox(empty, popx.NewMigrator(nil, logrusx.New("", ""), nil, 0), popx.WithGoMigrations( + popx.Migrations{ + { + Path: "transactional", + Version: "1", + Name: "gomigration_tx", + Direction: "up", + Type: "go", + DBType: "all", + RunnerNoTx: nil, + Runner: nil, + }, + { + Path: "transactional", + Version: "1", + Name: "gomigration_tx", + Direction: "down", + Type: "go", + DBType: "all", + RunnerNoTx: nil, + Runner: nil, + }, + })) + require.ErrorContains(t, err, "no runner defined") + require.Nil(t, mb) +} + +func TestNoTransaction(t *testing.T) { + c, err := pop.NewConnection(&pop.ConnectionDetails{ + URL: "sqlite://file::memory:", + }) + require.NoError(t, err) + require.NoError(t, c.Open()) + + require.NoError(t, c.RawQuery("CREATE TABLE tests (i INTEGER, j INTEGER)").Exec()) + + up1, up2 := make(chan struct{}), make(chan struct{}) + down1, down2 := make(chan struct{}), make(chan struct{}) + rnd := rand.NewSource(time.Now().Unix()) + i1, i2, j1, j2 := rnd.Int63(), rnd.Int63(), rnd.Int63(), rnd.Int63() + mb, err := popx.NewMigrationBox(empty, popx.NewMigrator(c, logrusx.New("", ""), nil, 0), popx.WithGoMigrations( + popx.Migrations{ + { + Path: "gomigration_notx", + Version: "1", + Name: "gomigration no transaction", + Direction: "up", + Type: "go", + DBType: "all", + RunnerNoTx: func(m popx.Migration, c *pop.Connection) error { + if _, err := c.Store.Exec("INSERT INTO tests (i, j) VALUES (?, ?)", i1, j1); err != nil { + return errors.WithStack(err) + } + close(up1) + <-up2 + return nil + }, + }, + { + Path: "gomigration_notx", + Version: "1", + Name: "gomigration no transaction", + Direction: "down", + Type: "go", + DBType: "all", + RunnerNoTx: func(m popx.Migration, c *pop.Connection) error { + if _, err := c.Store.Exec("INSERT INTO tests (i, j) VALUES (?, ?)", i2, j2); err != nil { + return errors.WithStack(err) + } + close(down1) + <-down2 + return nil + }, + }, + }, + )) + require.NoError(t, err) + errs := make(chan error, 10) + go func() { + errs <- mb.Up(context.Background()) + }() + <-up1 + var j int64 + require.NoError(t, c.Store.Get(&j, "SELECT j FROM tests WHERE i = ?", i1)) + assert.Equal(t, j1, j) + close(up2) + assert.NoError(t, <-errs) + + go func() { + errs <- mb.Down(context.Background(), 20) + }() + <-down1 + j = 0 + require.NoError(t, c.Store.Get(&j, "SELECT j FROM tests WHERE i = ?", i2)) + assert.Equal(t, j2, j) + close(down2) + assert.NoError(t, <-errs) +} diff --git a/popx/migration_info.go b/popx/migration_info.go index 0416d8cd..92b9738e 100644 --- a/popx/migration_info.go +++ b/popx/migration_info.go @@ -4,9 +4,10 @@ package popx import ( - "fmt" "sort" + "github.com/pkg/errors" + "github.com/gobuffalo/pop/v6" ) @@ -18,23 +19,28 @@ type Migration struct { Version string // Name of the migration (create_widgets) Name string - // Direction of the migration (up) + // Direction of the migration (up|down) Direction string - // Type of migration (sql) + // Type of migration (sql|go) Type string // DB type (all|postgres|mysql...) DBType string - // Runner function to run/execute the migration + // Runner function to run/execute the migration. Will be wrapped in a + // database transaction. Mutually exclusive with RunnerNoTx Runner func(Migration, *pop.Connection, *pop.Tx) error + // RunnerNoTx function to run/execute the migration. NOT wrapped in a + // database transaction. Mutually exclusive with Runner. + RunnerNoTx func(Migration, *pop.Connection) error } -// Run the migration. Returns an error if there is -// no mf.Runner defined. -func (mf Migration) Run(c *pop.Connection, tx *pop.Tx) error { - if mf.Runner == nil { - return fmt.Errorf("no runner defined for %s", mf.Path) +func (m Migration) Valid() error { + if m.Runner == nil && m.RunnerNoTx == nil { + return errors.Errorf("no runner defined for %s", m.Path) + } + if m.Runner != nil && m.RunnerNoTx != nil { + return errors.Errorf("incompatible transaction and non-transaction runners defined for %s", m.Path) } - return mf.Runner(mf, c, tx) + return nil } // Migrations is a collection of Migration diff --git a/popx/migrator.go b/popx/migrator.go index 28eeec59..2748ef50 100644 --- a/popx/migrator.go +++ b/popx/migrator.go @@ -101,18 +101,20 @@ func (m *Migrator) UpTo(ctx context.Context, step int) (applied int, err error) mtn := m.sanitizedMigrationTableName(c) mfs := m.Migrations["up"].SortAndFilter(c.Dialect.Name()) for _, mi := range mfs { + l := m.l.WithField("version", mi.Version).WithField("migration_name", mi.Name).WithField("migration_file", mi.Path) + exists, err := c.Where("version = ?", mi.Version).Exists(mtn) if err != nil { return errors.Wrapf(err, "problem checking for migration version %s", mi.Version) } if exists { - m.l.WithField("version", mi.Version).Debug("Migration has already been applied, skipping.") + l.Debug("Migration has already been applied, skipping.") continue } if len(mi.Version) > 14 { - m.l.WithField("version", mi.Version).Debug("Migration has not been applied but it might be a legacy migration, investigating.") + l.Debug("Migration has not been applied but it might be a legacy migration, investigating.") legacyVersion := mi.Version[:14] exists, err = c.Where("version = ?", legacyVersion).Exists(mtn) @@ -121,7 +123,7 @@ func (m *Migrator) UpTo(ctx context.Context, step int) (applied int, err error) } if exists { - m.l.WithField("version", mi.Version).WithField("legacy_version", legacyVersion).WithField("migration_table", mtn).Debug("Migration has already been applied in a legacy migration run. Updating version in migration table.") + l.WithField("legacy_version", legacyVersion).WithField("migration_table", mtn).Debug("Migration has already been applied in a legacy migration run. Updating version in migration table.") if err := m.isolatedTransaction(ctx, "init-migrate", func(conn *pop.Connection) error { // We do not want to remove the legacy migration version or subsequent migrations might be applied twice. // @@ -141,23 +143,40 @@ func (m *Migrator) UpTo(ctx context.Context, step int) (applied int, err error) } } - m.l.WithField("version", mi.Version).Debug("Migration has not yet been applied, running migration.") + l.Info("Migration has not yet been applied, running migration.") + + if err := mi.Valid(); err != nil { + return err + } + + if mi.Runner != nil { + err := m.isolatedTransaction(ctx, "up", func(conn *pop.Connection) error { + if err := mi.Runner(mi, conn, conn.TX); err != nil { + return err + } - if err = m.isolatedTransaction(ctx, "up", func(conn *pop.Connection) error { - if err := mi.Run(conn, conn.TX); err != nil { + // #nosec G201 - mtn is a system-wide const + if err := conn.RawQuery(fmt.Sprintf("INSERT INTO %s (version) VALUES (?)", mtn), mi.Version).Exec(); err != nil { + return errors.Wrapf(err, "problem inserting migration version %s", mi.Version) + } + return nil + }) + if err != nil { + return err + } + } else { + l.Warn("Migration has requested running outside a transaction. Proceed with caution.") + if err := mi.RunnerNoTx(mi, c); err != nil { return err } // #nosec G201 - mtn is a system-wide const - if _, err = conn.TX.Exec(fmt.Sprintf("INSERT INTO %s (version) VALUES ('%s')", mtn, mi.Version)); err != nil { - return errors.Wrapf(err, "problem inserting migration version %s", mi.Version) + if err := c.RawQuery(fmt.Sprintf("INSERT INTO %s (version) VALUES (?)", mtn), mi.Version).Exec(); err != nil { + return errors.Wrapf(err, "problem inserting migration version %s. YOUR DATABASE MAY BE IN AN INCONSISTENT STATE! MANUAL INTERVENTION REQUIRED!", mi.Version) } - return nil - }); err != nil { - return err } - m.l.Debugf("> %s", mi.Name) + l.Infof("> %s applied successfully", mi.Name) applied++ if step > 0 && applied >= step { break @@ -215,21 +234,37 @@ func (m *Migrator) Down(ctx context.Context, step int) error { return errors.Errorf("migration version %s does not exist", mi.Version) } - err = m.isolatedTransaction(ctx, "down", func(conn *pop.Connection) error { - err := mi.Run(conn, conn.TX) + if err := mi.Valid(); err != nil { + return err + } + + if mi.Runner != nil { + err := m.isolatedTransaction(ctx, "down", func(conn *pop.Connection) error { + err := mi.Runner(mi, conn, conn.TX) + if err != nil { + return err + } + + // #nosec G201 - mtn is a system-wide const + if err := conn.RawQuery(fmt.Sprintf("DELETE FROM %s WHERE version = ?", mtn), mi.Version).Exec(); err != nil { + return errors.Wrapf(err, "problem deleting migration version %s", mi.Version) + } + + return nil + }) + if err != nil { + return err + } + } else { + err := mi.RunnerNoTx(mi, c) if err != nil { return err } // #nosec G201 - mtn is a system-wide const - if err = conn.RawQuery(fmt.Sprintf("DELETE FROM %s WHERE version = ?", mtn), mi.Version).Exec(); err != nil { - return errors.Wrapf(err, "problem deleting migration version %s", mi.Version) + if err := c.RawQuery(fmt.Sprintf("DELETE FROM %s WHERE version = ?", mtn), mi.Version).Exec(); err != nil { + return errors.Wrapf(err, "problem deleting migration version %s. YOUR DATABASE MAY BE IN AN INCONSISTENT STATE! MANUAL INTERVENTION REQUIRED!", mi.Version) } - - return nil - }) - if err != nil { - return err } m.l.Debugf("< %s", mi.Name) @@ -512,13 +547,6 @@ func (m *Migrator) DumpMigrationSchema(ctx context.Context) error { return nil } -func (m *Migrator) wrapSpan(ctx context.Context, opName string, f func(ctx context.Context, span trace.Span) error) error { - span, ctx := m.startSpan(ctx, opName) - defer span.End() - - return f(ctx, span) -} - func (m *Migrator) startSpan(ctx context.Context, opName string) (trace.Span, context.Context) { tracer := otel.Tracer(tracingComponent) if m.tracer.IsLoaded() {