Skip to content

Commit

Permalink
Add create index operation
Browse files Browse the repository at this point in the history
  • Loading branch information
andrew-farries committed Aug 14, 2023
1 parent 758ea6e commit 974a3f9
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pkg/migrations/op_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
OpNameDropTable OpName = "drop_table"
OpNameAddColumn OpName = "add_column"
OpNameDropColumn OpName = "drop_column"
OpNameCreateIndex OpName = "create_index"
)

func TemporaryName(name string) string {
Expand Down Expand Up @@ -86,6 +87,9 @@ func (v *Operations) UnmarshalJSON(data []byte) error {
case OpNameDropColumn:
item = &OpDropColumn{}

case OpNameCreateIndex:
item = &OpCreateIndex{}

default:
return fmt.Errorf("unknown migration type: %v", opName)
}
Expand Down Expand Up @@ -133,6 +137,9 @@ func (v Operations) MarshalJSON() ([]byte, error) {
case *OpDropColumn:
opName = OpNameDropColumn

case *OpCreateIndex:
opName = OpNameCreateIndex

default:
panic(fmt.Errorf("unknown operation for %T", op))
}
Expand Down
69 changes: 69 additions & 0 deletions pkg/migrations/op_create_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package migrations

import (
"context"
"database/sql"
"fmt"
"strings"

"github.com/lib/pq"

"pg-roll/pkg/schema"
)

type OpCreateIndex struct {
Table string `json:"table"`
Columns []string `json:"columns"`
}

var _ Operation = (*OpCreateIndex)(nil)

func (o *OpCreateIndex) Start(ctx context.Context, conn *sql.DB, schemaName string, stateSchema string, s *schema.Schema) error {
// create index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s.%s (%s)",
pq.QuoteIdentifier(IndexName(o.Table, o.Columns)),
pq.QuoteIdentifier(schemaName),
pq.QuoteIdentifier(o.Table),
strings.Join(quoteColumnNames(o.Columns), ", ")))
return err
}

func (o *OpCreateIndex) Complete(ctx context.Context, conn *sql.DB) error {
// No-op
return nil
}

func (o *OpCreateIndex) Rollback(ctx context.Context, conn *sql.DB) error {
// drop the index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("DROP INDEX CONCURRENTLY IF EXISTS %s",
IndexName(o.Table, o.Columns)))

return err
}

func (o *OpCreateIndex) Validate(ctx context.Context, s *schema.Schema) error {
table := s.GetTable(o.Table)

if table == nil {
return TableDoesNotExistError{Name: o.Table}
}

for _, column := range o.Columns {
if table.GetColumn(column) == nil {
return ColumnDoesNotExistError{Table: o.Table, Name: column}
}
}

return nil
}

func IndexName(table string, columns []string) string {
return "_pgroll_idx_" + table + "_" + strings.Join(columns, "_")
}

func quoteColumnNames(columns []string) (quoted []string) {
for _, col := range columns {
quoted = append(quoted, pq.QuoteIdentifier(col))
}
return quoted
}

0 comments on commit 974a3f9

Please sign in to comment.