Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel migration (#18815) #18817

Merged
merged 10 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/18817.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
migration: allow parallelization of key migration for `vault operator migrate` in order to speed up a migration.
```
54 changes: 37 additions & 17 deletions command/operator_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"net/url"
"os"
"sort"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/mitchellh/cli"
"github.com/pkg/errors"
"github.com/posener/complete"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -39,6 +41,7 @@ type OperatorMigrateCommand struct {
flagLogLevel string
flagStart string
flagReset bool
flagMaxParallel int
logger log.Logger
ShutdownCh chan struct{}
}
Expand Down Expand Up @@ -98,6 +101,14 @@ func (c *OperatorMigrateCommand) Flags() *FlagSets {
Usage: "Reset the migration lock. No migration will occur.",
})

f.IntVar(&IntVar{
Name: "max-parallel",
Default: 1,
peteski22 marked this conversation as resolved.
Show resolved Hide resolved
Target: &c.flagMaxParallel,
Usage: "Specifies the maximum number of parallel migration threads (goroutines) that may be used when migrating. " +
"This can speed up the migration process on slow backends but uses more resources.",
})

f.StringVar(&StringVar{
Name: "log-level",
Target: &c.flagLogLevel,
Expand Down Expand Up @@ -126,7 +137,6 @@ func (c *OperatorMigrateCommand) Run(args []string) int {
c.UI.Error(err.Error())
return 1
}

c.flagLogLevel = strings.ToLower(c.flagLogLevel)
validLevels := []string{"trace", "debug", "info", "warn", "error"}
if !strutil.StrListContains(validLevels, c.flagLogLevel) {
Expand All @@ -135,6 +145,11 @@ func (c *OperatorMigrateCommand) Run(args []string) int {
}
c.logger = logging.NewVaultLogger(log.LevelFromString(c.flagLogLevel))

if c.flagMaxParallel < 1 {
c.UI.Error(fmt.Sprintf("Argument to flag -max-parallel must be between 1 and %d", math.MaxInt32))
return 1
}

if c.flagConfig == "" {
c.UI.Error("Must specify exactly one config path using -config")
return 1
Expand Down Expand Up @@ -164,7 +179,7 @@ func (c *OperatorMigrateCommand) Run(args []string) int {
}

// migrate attempts to instantiate the source and destinations backends,
// and then invoke the migration the the root of the keyspace.
// and then invoke the migration the root of the keyspace.
func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
from, err := c.newBackend(config.StorageSource.Type, config.StorageSource.Config)
if err != nil {
Expand Down Expand Up @@ -209,7 +224,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {

doneCh := make(chan error)
go func() {
doneCh <- c.migrateAll(ctx, from, to)
doneCh <- c.migrateAll(ctx, from, to, c.flagMaxParallel)
}()

select {
Expand All @@ -225,8 +240,8 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
}

// migrateAll copies all keys in lexicographic order.
func (c *OperatorMigrateCommand) migrateAll(ctx context.Context, from physical.Backend, to physical.Backend) error {
return dfsScan(ctx, from, func(ctx context.Context, path string) error {
func (c *OperatorMigrateCommand) migrateAll(ctx context.Context, from physical.Backend, to physical.Backend, maxParallel int) error {
return dfsScan(ctx, from, maxParallel, func(ctx context.Context, path string) error {
if path < c.flagStart || path == storageMigrationLock || path == vault.CoreLockPath {
return nil
}
Expand Down Expand Up @@ -365,10 +380,20 @@ func parseStorage(result *migratorConfig, list *ast.ObjectList, name string) err

// dfsScan will invoke cb with every key from source.
// Keys will be traversed in lexicographic, depth-first order.
func dfsScan(ctx context.Context, source physical.Backend, cb func(ctx context.Context, path string) error) error {
func dfsScan(ctx context.Context, source physical.Backend, maxParallel int, cb func(ctx context.Context, path string) error) error {
dfs := []string{""}

eg, ctx := errgroup.WithContext(ctx)
peteski22 marked this conversation as resolved.
Show resolved Hide resolved
eg.SetLimit(maxParallel)

for l := len(dfs); l > 0; l = len(dfs) {
// Check for cancellation
select {
case <-ctx.Done():
peteski22 marked this conversation as resolved.
Show resolved Hide resolved
return ctx.Err()
default:
}

key := dfs[len(dfs)-1]
if key == "" || strings.HasSuffix(key, "/") {
children, err := source.List(ctx, key)
Expand All @@ -385,19 +410,14 @@ func dfsScan(ctx context.Context, source physical.Backend, cb func(ctx context.C
}
}
} else {
err := cb(ctx, key)
if err != nil {
return err
}
// Pooling
eg.Go(func() error {
ncabatoff marked this conversation as resolved.
Show resolved Hide resolved
return cb(ctx, key)
})

dfs = dfs[:len(dfs)-1]
}

select {
case <-ctx.Done():
return nil
default:
}
}
return nil

return eg.Wait()
}
119 changes: 102 additions & 17 deletions command/operator_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"

Expand All @@ -35,8 +35,8 @@ func TestMigration(t *testing.T) {

fromFactory := physicalBackends["file"]

folder := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator"))
defer os.RemoveAll(folder)
folder := filepath.Join(t.TempDir(), testhelpers.RandomWithPrefix("migrator"))
peteski22 marked this conversation as resolved.
Show resolved Hide resolved

confFrom := map[string]string{
"path": folder,
}
Expand All @@ -55,14 +55,51 @@ func TestMigration(t *testing.T) {
if err != nil {
t.Fatal(err)
}

cmd := OperatorMigrateCommand{
logger: log.NewNullLogger(),
}
if err := cmd.migrateAll(context.Background(), from, to); err != nil {
if err := cmd.migrateAll(context.Background(), from, to, 1); err != nil {
t.Fatal(err)
}

if err := compareStoredData(to, data, ""); err != nil {
t.Fatal(err)
}
})

t.Run("Concurrent migration", func(t *testing.T) {
data := generateData()

fromFactory := physicalBackends["file"]

folder := filepath.Join(t.TempDir(), testhelpers.RandomWithPrefix("migrator"))

confFrom := map[string]string{
"path": folder,
}

from, err := fromFactory(confFrom, nil)
if err != nil {
t.Fatal(err)
}
if err := storeData(from, data); err != nil {
t.Fatal(err)
}

toFactory := physicalBackends["inmem"]
confTo := map[string]string{}
to, err := toFactory(confTo, nil)
if err != nil {
t.Fatal(err)
}

cmd := OperatorMigrateCommand{
logger: log.NewNullLogger(),
}

if err := cmd.migrateAll(context.Background(), from, to, 10); err != nil {
t.Fatal(err)
}
if err := compareStoredData(to, data, ""); err != nil {
t.Fatal(err)
}
Expand All @@ -82,8 +119,46 @@ func TestMigration(t *testing.T) {
}

toFactory := physicalBackends["file"]
folder := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator"))
defer os.RemoveAll(folder)
folder := filepath.Join(t.TempDir(), testhelpers.RandomWithPrefix("migrator"))
confTo := map[string]string{
"path": folder,
}

to, err := toFactory(confTo, nil)
if err != nil {
t.Fatal(err)
}

const start = "m"

cmd := OperatorMigrateCommand{
logger: log.NewNullLogger(),
flagStart: start,
}
if err := cmd.migrateAll(context.Background(), from, to, 1); err != nil {
t.Fatal(err)
}

if err := compareStoredData(to, data, start); err != nil {
t.Fatal(err)
}
})

t.Run("Start option (parallel)", func(t *testing.T) {
data := generateData()

fromFactory := physicalBackends["inmem"]
confFrom := map[string]string{}
from, err := fromFactory(confFrom, nil)
if err != nil {
t.Fatal(err)
}
if err := storeData(from, data); err != nil {
t.Fatal(err)
}

toFactory := physicalBackends["file"]
folder := filepath.Join(t.TempDir(), testhelpers.RandomWithPrefix("migrator"))
confTo := map[string]string{
"path": folder,
}
Expand All @@ -99,7 +174,7 @@ func TestMigration(t *testing.T) {
logger: log.NewNullLogger(),
flagStart: start,
}
if err := cmd.migrateAll(context.Background(), from, to); err != nil {
if err := cmd.migrateAll(context.Background(), from, to, 10); err != nil {
t.Fatal(err)
}

Expand All @@ -111,16 +186,15 @@ func TestMigration(t *testing.T) {
t.Run("Config parsing", func(t *testing.T) {
cmd := new(OperatorMigrateCommand)

cfgName := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator"))
ioutil.WriteFile(cfgName, []byte(`
cfgName := filepath.Join(t.TempDir(), testhelpers.RandomWithPrefix("migrator"))
os.WriteFile(cfgName, []byte(`
storage_source "src_type" {
path = "src_path"
}

storage_destination "dest_type" {
path = "dest_path"
}`), 0o644)
defer os.Remove(cfgName)

expCfg := &migratorConfig{
StorageSource: &server.Storage{
Expand All @@ -145,7 +219,7 @@ storage_destination "dest_type" {
}

verifyBad := func(cfg string) {
ioutil.WriteFile(cfgName, []byte(cfg), 0o644)
os.WriteFile(cfgName, []byte(cfg), 0o644)
_, err := cmd.loadMigratorConfig(cfgName)
if err == nil {
t.Fatalf("expected error but none received from: %v", cfg)
Expand Down Expand Up @@ -192,6 +266,7 @@ storage_destination "dest_type2" {
path = "dest_path"
}`)
})

t.Run("DFS Scan", func(t *testing.T) {
s, _ := physicalBackends["inmem"](map[string]string{}, nil)

Expand All @@ -204,9 +279,16 @@ storage_destination "dest_type2" {

l := randomLister{s}

var out []string
dfsScan(context.Background(), l, func(ctx context.Context, path string) error {
out = append(out, path)
type SafeAppend struct {
out []string
lock sync.Mutex
}
outKeys := SafeAppend{}
dfsScan(context.Background(), l, 10, func(ctx context.Context, path string) error {
outKeys.lock.Lock()
defer outKeys.lock.Unlock()

outKeys.out = append(outKeys.out, path)
return nil
})

Expand All @@ -218,8 +300,11 @@ storage_destination "dest_type2" {
keys = append(keys, key)
}
sort.Strings(keys)
if !reflect.DeepEqual(keys, out) {
t.Fatalf("expected equal: %v, %v", keys, out)
outKeys.lock.Lock()
sort.Strings(outKeys.out)
outKeys.lock.Unlock()
if !reflect.DeepEqual(keys, outKeys.out) {
t.Fatalf("expected equal: %v, %v", keys, outKeys.out)
}
})
}
Expand Down
8 changes: 8 additions & 0 deletions website/content/docs/commands/operator/migrate.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,11 @@ The following flags are available for the `operator migrate` command.
- `-reset` - Reset the migration lock. A lock file is added during migration to prevent
starting the Vault server or another migration. The `-reset` option can be used to
remove a stale lock file if present.

- `-max-parallel` `int: 1` - Allows the operator to specify the maximum number of lightweight threads (goroutines)
which may be used to migrate data in parallel. This can potentially speed up migration on slower backends at
the cost of more resources (e.g. CPU, memory). Permitted values range from greater than `1` to the maximum value
for an `integer`, and when supplied parallel migration will be attempted.

~> Note: The maximum number of concurrent requests handled by a storage backend is ultimately governed by the
storage backend configuration setting, which enforces a maximum number of concurrent requests (`max_parallel`).