Skip to content

Commit

Permalink
Parallel migration (#18815) (#18817)
Browse files Browse the repository at this point in the history
* Parallel migration (#18815)
* flagParallel sanity check
* Attempt to use ErrGroups
* Updated docs
* Allow 'start' and 'max-parallel' together
* parallel flag renamed to max-parallel
* tests for start + parallel
* Removed permit pool
* Updated docs to make it clearer that a high setting might not be honored based on storage backend setting
* System dependent max int size
* Default max-parallel 1 => 10
* Test folder/paths updated

Co-authored-by: Tomasz Pawelczak <[email protected]>
Co-authored-by: Mike Palmiotto <[email protected]>
  • Loading branch information
3 people authored Jan 25, 2023
1 parent 3d3cf1e commit 37b1ab1
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 36 deletions.
3 changes: 3 additions & 0 deletions changelog/18817.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
migration: allow parallelization of key migration for `vault operator migrate` in order to speed up a migration.
```
54 changes: 37 additions & 17 deletions command/operator_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"net/url"
"os"
"sort"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/mitchellh/cli"
"github.com/pkg/errors"
"github.com/posener/complete"
"golang.org/x/sync/errgroup"
)

var (
Expand All @@ -39,6 +41,7 @@ type OperatorMigrateCommand struct {
flagLogLevel string
flagStart string
flagReset bool
flagMaxParallel int
logger log.Logger
ShutdownCh chan struct{}
}
Expand Down Expand Up @@ -98,6 +101,14 @@ func (c *OperatorMigrateCommand) Flags() *FlagSets {
Usage: "Reset the migration lock. No migration will occur.",
})

f.IntVar(&IntVar{
Name: "max-parallel",
Default: 10,
Target: &c.flagMaxParallel,
Usage: "Specifies the maximum number of parallel migration threads (goroutines) that may be used when migrating. " +
"This can speed up the migration process on slow backends but uses more resources.",
})

f.StringVar(&StringVar{
Name: "log-level",
Target: &c.flagLogLevel,
Expand Down Expand Up @@ -126,7 +137,6 @@ func (c *OperatorMigrateCommand) Run(args []string) int {
c.UI.Error(err.Error())
return 1
}

c.flagLogLevel = strings.ToLower(c.flagLogLevel)
validLevels := []string{"trace", "debug", "info", "warn", "error"}
if !strutil.StrListContains(validLevels, c.flagLogLevel) {
Expand All @@ -135,6 +145,11 @@ func (c *OperatorMigrateCommand) Run(args []string) int {
}
c.logger = logging.NewVaultLogger(log.LevelFromString(c.flagLogLevel))

if c.flagMaxParallel < 1 {
c.UI.Error(fmt.Sprintf("Argument to flag -max-parallel must be between 1 and %d", math.MaxInt))
return 1
}

if c.flagConfig == "" {
c.UI.Error("Must specify exactly one config path using -config")
return 1
Expand Down Expand Up @@ -164,7 +179,7 @@ func (c *OperatorMigrateCommand) Run(args []string) int {
}

// migrate attempts to instantiate the source and destinations backends,
// and then invoke the migration the the root of the keyspace.
// and then invoke the migration the root of the keyspace.
func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
from, err := c.newBackend(config.StorageSource.Type, config.StorageSource.Config)
if err != nil {
Expand Down Expand Up @@ -209,7 +224,7 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {

doneCh := make(chan error)
go func() {
doneCh <- c.migrateAll(ctx, from, to)
doneCh <- c.migrateAll(ctx, from, to, c.flagMaxParallel)
}()

select {
Expand All @@ -225,8 +240,8 @@ func (c *OperatorMigrateCommand) migrate(config *migratorConfig) error {
}

// migrateAll copies all keys in lexicographic order.
func (c *OperatorMigrateCommand) migrateAll(ctx context.Context, from physical.Backend, to physical.Backend) error {
return dfsScan(ctx, from, func(ctx context.Context, path string) error {
func (c *OperatorMigrateCommand) migrateAll(ctx context.Context, from physical.Backend, to physical.Backend, maxParallel int) error {
return dfsScan(ctx, from, maxParallel, func(ctx context.Context, path string) error {
if path < c.flagStart || path == storageMigrationLock || path == vault.CoreLockPath {
return nil
}
Expand Down Expand Up @@ -365,10 +380,20 @@ func parseStorage(result *migratorConfig, list *ast.ObjectList, name string) err

// dfsScan will invoke cb with every key from source.
// Keys will be traversed in lexicographic, depth-first order.
func dfsScan(ctx context.Context, source physical.Backend, cb func(ctx context.Context, path string) error) error {
func dfsScan(ctx context.Context, source physical.Backend, maxParallel int, cb func(ctx context.Context, path string) error) error {
dfs := []string{""}

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(maxParallel)

for l := len(dfs); l > 0; l = len(dfs) {
// Check for cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}

key := dfs[len(dfs)-1]
if key == "" || strings.HasSuffix(key, "/") {
children, err := source.List(ctx, key)
Expand All @@ -385,19 +410,14 @@ func dfsScan(ctx context.Context, source physical.Backend, cb func(ctx context.C
}
}
} else {
err := cb(ctx, key)
if err != nil {
return err
}
// Pooling
eg.Go(func() error {
return cb(ctx, key)
})

dfs = dfs[:len(dfs)-1]
}

select {
case <-ctx.Done():
return nil
default:
}
}
return nil

return eg.Wait()
}
121 changes: 102 additions & 19 deletions command/operator_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"

"github.com/go-test/deep"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-secure-stdlib/base62"
"github.com/hashicorp/vault/command/server"
"github.com/hashicorp/vault/helper/testhelpers"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/vault"
)
Expand All @@ -35,8 +34,8 @@ func TestMigration(t *testing.T) {

fromFactory := physicalBackends["file"]

folder := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator"))
defer os.RemoveAll(folder)
folder := t.TempDir()

confFrom := map[string]string{
"path": folder,
}
Expand All @@ -55,11 +54,10 @@ func TestMigration(t *testing.T) {
if err != nil {
t.Fatal(err)
}

cmd := OperatorMigrateCommand{
logger: log.NewNullLogger(),
}
if err := cmd.migrateAll(context.Background(), from, to); err != nil {
if err := cmd.migrateAll(context.Background(), from, to, 1); err != nil {
t.Fatal(err)
}

Expand All @@ -68,6 +66,44 @@ func TestMigration(t *testing.T) {
}
})

t.Run("Concurrent migration", func(t *testing.T) {
data := generateData()

fromFactory := physicalBackends["file"]

folder := t.TempDir()

confFrom := map[string]string{
"path": folder,
}

from, err := fromFactory(confFrom, nil)
if err != nil {
t.Fatal(err)
}
if err := storeData(from, data); err != nil {
t.Fatal(err)
}

toFactory := physicalBackends["inmem"]
confTo := map[string]string{}
to, err := toFactory(confTo, nil)
if err != nil {
t.Fatal(err)
}

cmd := OperatorMigrateCommand{
logger: log.NewNullLogger(),
}

if err := cmd.migrateAll(context.Background(), from, to, 10); err != nil {
t.Fatal(err)
}
if err := compareStoredData(to, data, ""); err != nil {
t.Fatal(err)
}
})

t.Run("Start option", func(t *testing.T) {
data := generateData()

Expand All @@ -82,8 +118,7 @@ func TestMigration(t *testing.T) {
}

toFactory := physicalBackends["file"]
folder := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator"))
defer os.RemoveAll(folder)
folder := t.TempDir()
confTo := map[string]string{
"path": folder,
}
Expand All @@ -99,7 +134,46 @@ func TestMigration(t *testing.T) {
logger: log.NewNullLogger(),
flagStart: start,
}
if err := cmd.migrateAll(context.Background(), from, to); err != nil {
if err := cmd.migrateAll(context.Background(), from, to, 1); err != nil {
t.Fatal(err)
}

if err := compareStoredData(to, data, start); err != nil {
t.Fatal(err)
}
})

t.Run("Start option (parallel)", func(t *testing.T) {
data := generateData()

fromFactory := physicalBackends["inmem"]
confFrom := map[string]string{}
from, err := fromFactory(confFrom, nil)
if err != nil {
t.Fatal(err)
}
if err := storeData(from, data); err != nil {
t.Fatal(err)
}

toFactory := physicalBackends["file"]
folder := t.TempDir()
confTo := map[string]string{
"path": folder,
}

to, err := toFactory(confTo, nil)
if err != nil {
t.Fatal(err)
}

const start = "m"

cmd := OperatorMigrateCommand{
logger: log.NewNullLogger(),
flagStart: start,
}
if err := cmd.migrateAll(context.Background(), from, to, 10); err != nil {
t.Fatal(err)
}

Expand All @@ -110,17 +184,15 @@ func TestMigration(t *testing.T) {

t.Run("Config parsing", func(t *testing.T) {
cmd := new(OperatorMigrateCommand)

cfgName := filepath.Join(os.TempDir(), testhelpers.RandomWithPrefix("migrator"))
ioutil.WriteFile(cfgName, []byte(`
cfgName := filepath.Join(t.TempDir(), "migrator")
os.WriteFile(cfgName, []byte(`
storage_source "src_type" {
path = "src_path"
}
storage_destination "dest_type" {
path = "dest_path"
}`), 0o644)
defer os.Remove(cfgName)

expCfg := &migratorConfig{
StorageSource: &server.Storage{
Expand All @@ -145,7 +217,7 @@ storage_destination "dest_type" {
}

verifyBad := func(cfg string) {
ioutil.WriteFile(cfgName, []byte(cfg), 0o644)
os.WriteFile(cfgName, []byte(cfg), 0o644)
_, err := cmd.loadMigratorConfig(cfgName)
if err == nil {
t.Fatalf("expected error but none received from: %v", cfg)
Expand Down Expand Up @@ -192,6 +264,7 @@ storage_destination "dest_type2" {
path = "dest_path"
}`)
})

t.Run("DFS Scan", func(t *testing.T) {
s, _ := physicalBackends["inmem"](map[string]string{}, nil)

Expand All @@ -204,9 +277,16 @@ storage_destination "dest_type2" {

l := randomLister{s}

var out []string
dfsScan(context.Background(), l, func(ctx context.Context, path string) error {
out = append(out, path)
type SafeAppend struct {
out []string
lock sync.Mutex
}
outKeys := SafeAppend{}
dfsScan(context.Background(), l, 10, func(ctx context.Context, path string) error {
outKeys.lock.Lock()
defer outKeys.lock.Unlock()

outKeys.out = append(outKeys.out, path)
return nil
})

Expand All @@ -218,8 +298,11 @@ storage_destination "dest_type2" {
keys = append(keys, key)
}
sort.Strings(keys)
if !reflect.DeepEqual(keys, out) {
t.Fatalf("expected equal: %v, %v", keys, out)
outKeys.lock.Lock()
sort.Strings(outKeys.out)
outKeys.lock.Unlock()
if !reflect.DeepEqual(keys, outKeys.out) {
t.Fatalf("expected equal: %v, %v", keys, outKeys.out)
}
})
}
Expand Down
8 changes: 8 additions & 0 deletions website/content/docs/commands/operator/migrate.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,11 @@ The following flags are available for the `operator migrate` command.
- `-reset` - Reset the migration lock. A lock file is added during migration to prevent
starting the Vault server or another migration. The `-reset` option can be used to
remove a stale lock file if present.

- `-max-parallel` `int: 10` - Allows the operator to specify the maximum number of lightweight threads (goroutines)
which may be used to migrate data in parallel. This can potentially speed up migration on slower backends at
the cost of more resources (e.g. CPU, memory). Permitted values range from `1` (synchronous) to the maximum value
for an `integer`. If not supplied, a default of `10` parallel goroutines will be used.

~> Note: The maximum number of concurrent requests handled by a storage backend is ultimately governed by the
storage backend configuration setting, which enforces a maximum number of concurrent requests (`max_parallel`).

0 comments on commit 37b1ab1

Please sign in to comment.