Skip to content

Commit a185b9e

Browse files
authored
Merge pull request #495 from AnatolyRugalev/postgresql-support-multistatement
Support multi-statement execution for PostgreSQL
2 parents dab829b + 42abb24 commit a185b9e

File tree

3 files changed

+97
-16
lines changed

3 files changed

+97
-16
lines changed

database/postgres/README.md

+9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
|------------|---------------------|-------------|
77
| `x-migrations-table` | `MigrationsTable` | Name of the migrations table |
88
| `x-statement-timeout` | `StatementTimeout` | Abort any statement that takes more than the specified number of milliseconds |
9+
| `x-multi-statement` | `MultiStatementEnabled` | Enable multi-statement execution (default: false) |
10+
| `x-multi-statement-max-size` | `MultiStatementMaxSize` | Maximum size of single statement in bytes (default: 10MB) |
911
| `dbname` | `DatabaseName` | The name of the database to connect to |
1012
| `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. |
1113
| `user` | | The user to sign in as |
@@ -27,3 +29,10 @@
2729
2. Wrap your existing migrations in transactions ([BEGIN/COMMIT](https://www.postgresql.org/docs/current/static/transaction-iso.html)) if you use multiple statements within one migration.
2830
3. Download and install the latest migrate version.
2931
4. Force the current migration version with `migrate force <current_version>`.
32+
33+
## Multi-statement mode
34+
35+
In PostgreSQL running multiple SQL statements in one `Exec` executes them inside a transaction. Sometimes this
36+
behavior is not desirable because some statements can be only run outside of transaction (e.g.
37+
`CREATE INDEX CONCURRENTLY`). If you want to use `CREATE INDEX CONCURRENTLY` without activating multi-statement mode
38+
you have to put such statements in a separate migration files.

database/postgres/postgres.go

+51-13
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/golang-migrate/migrate/v4"
1717
"github.com/golang-migrate/migrate/v4/database"
18+
"github.com/golang-migrate/migrate/v4/database/multistmt"
1819
multierror "github.com/hashicorp/go-multierror"
1920
"github.com/lib/pq"
2021
)
@@ -25,7 +26,12 @@ func init() {
2526
database.Register("postgresql", &db)
2627
}
2728

28-
var DefaultMigrationsTable = "schema_migrations"
29+
var (
30+
multiStmtDelimiter = []byte(";")
31+
32+
DefaultMigrationsTable = "schema_migrations"
33+
DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB
34+
)
2935

3036
var (
3137
ErrNilConfig = fmt.Errorf("no config")
@@ -35,10 +41,12 @@ var (
3541
)
3642

3743
type Config struct {
38-
MigrationsTable string
39-
DatabaseName string
40-
SchemaName string
41-
StatementTimeout time.Duration
44+
MigrationsTable string
45+
DatabaseName string
46+
SchemaName string
47+
StatementTimeout time.Duration
48+
MultiStatementEnabled bool
49+
MultiStatementMaxSize int
4250
}
4351

4452
type Postgres struct {
@@ -132,10 +140,23 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
132140
}
133141
}
134142

143+
multiStatementMaxSize := DefaultMultiStatementMaxSize
144+
if s := purl.Query().Get("x-multi-statement-max-size"); len(s) > 0 {
145+
multiStatementMaxSize, err = strconv.Atoi(s)
146+
if err != nil {
147+
return nil, err
148+
}
149+
if multiStatementMaxSize <= 0 {
150+
multiStatementMaxSize = DefaultMultiStatementMaxSize
151+
}
152+
}
153+
135154
px, err := WithInstance(db, &Config{
136-
DatabaseName: purl.Path,
137-
MigrationsTable: migrationsTable,
138-
StatementTimeout: time.Duration(statementTimeout) * time.Millisecond,
155+
DatabaseName: purl.Path,
156+
MigrationsTable: migrationsTable,
157+
StatementTimeout: time.Duration(statementTimeout) * time.Millisecond,
158+
MultiStatementEnabled: purl.Query().Get("x-multi-statement") == "true",
159+
MultiStatementMaxSize: multiStatementMaxSize,
139160
})
140161

141162
if err != nil {
@@ -194,18 +215,36 @@ func (p *Postgres) Unlock() error {
194215
}
195216

196217
func (p *Postgres) Run(migration io.Reader) error {
218+
if p.config.MultiStatementEnabled {
219+
var err error
220+
if e := multistmt.Parse(migration, multiStmtDelimiter, p.config.MultiStatementMaxSize, func(m []byte) bool {
221+
if err = p.runStatement(m); err != nil {
222+
return false
223+
}
224+
return true
225+
}); e != nil {
226+
return e
227+
}
228+
return err
229+
}
197230
migr, err := ioutil.ReadAll(migration)
198231
if err != nil {
199232
return err
200233
}
234+
return p.runStatement(migr)
235+
}
236+
237+
func (p *Postgres) runStatement(statement []byte) error {
201238
ctx := context.Background()
202239
if p.config.StatementTimeout != 0 {
203240
var cancel context.CancelFunc
204241
ctx, cancel = context.WithTimeout(ctx, p.config.StatementTimeout)
205242
defer cancel()
206243
}
207-
// run migration
208-
query := string(migr[:])
244+
query := string(statement)
245+
if strings.TrimSpace(query) == "" {
246+
return nil
247+
}
209248
if _, err := p.conn.ExecContext(ctx, query); err != nil {
210249
if pgErr, ok := err.(*pq.Error); ok {
211250
var line uint
@@ -223,11 +262,10 @@ func (p *Postgres) Run(migration io.Reader) error {
223262
if pgErr.Detail != "" {
224263
message = fmt.Sprintf("%s, %s", message, pgErr.Detail)
225264
}
226-
return database.Error{OrigErr: err, Err: message, Query: migr, Line: line}
265+
return database.Error{OrigErr: err, Err: message, Query: statement, Line: line}
227266
}
228-
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
267+
return database.Error{OrigErr: err, Err: "migration failed", Query: statement}
229268
}
230-
231269
return nil
232270
}
233271

database/postgres/postgres_test.go

+37-3
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ var (
4242
}
4343
)
4444

45-
func pgConnectionString(host, port string) string {
46-
return fmt.Sprintf("postgres://postgres:%s@%s:%s/postgres?sslmode=disable", pgPassword, host, port)
45+
func pgConnectionString(host, port string, options ...string) string {
46+
options = append(options, "sslmode=disable")
47+
return fmt.Sprintf("postgres://postgres:%s@%s:%s/postgres?%s", pgPassword, host, port, strings.Join(options, "&"))
4748
}
4849

4950
func isReady(ctx context.Context, c dktest.ContainerInfo) bool {
@@ -122,7 +123,7 @@ func TestMigrate(t *testing.T) {
122123
})
123124
}
124125

125-
func TestMultiStatement(t *testing.T) {
126+
func TestMultipleStatements(t *testing.T) {
126127
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
127128
ip, port, err := c.FirstPort()
128129
if err != nil {
@@ -155,6 +156,39 @@ func TestMultiStatement(t *testing.T) {
155156
})
156157
}
157158

159+
func TestMultipleStatementsInMultiStatementMode(t *testing.T) {
160+
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
161+
ip, port, err := c.FirstPort()
162+
if err != nil {
163+
t.Fatal(err)
164+
}
165+
166+
addr := pgConnectionString(ip, port, "x-multi-statement=true")
167+
p := &Postgres{}
168+
d, err := p.Open(addr)
169+
if err != nil {
170+
t.Fatal(err)
171+
}
172+
defer func() {
173+
if err := d.Close(); err != nil {
174+
t.Error(err)
175+
}
176+
}()
177+
if err := d.Run(strings.NewReader("CREATE TABLE foo (foo text); CREATE INDEX CONCURRENTLY idx_foo ON foo (foo);")); err != nil {
178+
t.Fatalf("expected err to be nil, got %v", err)
179+
}
180+
181+
// make sure created index exists
182+
var exists bool
183+
if err := d.(*Postgres).conn.QueryRowContext(context.Background(), "SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE schemaname = (SELECT current_schema()) AND indexname = 'idx_foo')").Scan(&exists); err != nil {
184+
t.Fatal(err)
185+
}
186+
if !exists {
187+
t.Fatalf("expected table bar to exist")
188+
}
189+
})
190+
}
191+
158192
func TestErrorParsing(t *testing.T) {
159193
dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
160194
ip, port, err := c.FirstPort()

0 commit comments

Comments
 (0)