Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions go/vt/vtctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ limitations under the License.
package vtctl

import (
"errors"
"flag"
"fmt"
"io"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/wrangler"
)
Expand All @@ -34,19 +36,118 @@ func init() {
commandListBackups,
"<keyspace/shard>",
"Lists all the backups for a shard."})
addCommand("Shards", command{
"BackupShard",
commandBackupShard,
"<keyspace/shard>",
"Chooses a tablet and creates a backup for a shard."})
addCommand("Shards", command{
"RemoveBackup",
commandRemoveBackup,
"<keyspace/shard> <backup name>",
"Removes a backup for the BackupStorage."})

addCommand("Tablets", command{
"Backup",
commandBackup,
"[-concurrency=4] <tablet alias>",
"Stops mysqld and uses the BackupStorage service to store a new backup. This function also remembers if the tablet was replicating so that it can restore the same state after the backup completes."})
addCommand("Tablets", command{
"RestoreFromBackup",
commandRestoreFromBackup,
"<tablet alias>",
"Stops mysqld and restores the data from the latest backup."})
}

func commandBackup(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
concurrency := subFlags.Int("concurrency", 4, "Specifies the number of compression/checksum jobs to run simultaneously")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("the Backup command requires the <tablet alias> argument")
}

tabletAlias, err := topoproto.ParseTabletAlias(subFlags.Arg(0))
if err != nil {
return err
}
tabletInfo, err := wr.TopoServer().GetTablet(ctx, tabletAlias)
if err != nil {
return err
}

return execBackup(ctx, wr, tabletInfo.Tablet, *concurrency)
}

func commandBackupShard(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
concurrency := subFlags.Int("concurrency", 4, "Specifies the number of compression/checksum jobs to run simultaneously")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("action BackupShard requires <keyspace/shard>")
}

keyspace, shard, err := topoproto.ParseKeyspaceShard(subFlags.Arg(0))
if err != nil {
return err
}

tablets, stats, err := wr.ShardReplicationStatuses(ctx, keyspace, shard)
if tablets == nil {
return err
}

var tabletForBackup *topodatapb.Tablet
var secondsBehind uint32

for i := range tablets {
// don't run a backup on a non-slave type
if !tablets[i].IsSlaveType() {
continue
}

// choose the first tablet as the baseline
if tabletForBackup == nil {
tabletForBackup = tablets[i].Tablet
secondsBehind = stats[i].SecondsBehindMaster
continue
}

// choose a new tablet if it is more up to date
if stats[i].SecondsBehindMaster < secondsBehind {
tabletForBackup = tablets[i].Tablet
secondsBehind = stats[i].SecondsBehindMaster
}
}

if tabletForBackup == nil {
return errors.New("no tablet available for backup")
}

return execBackup(ctx, wr, tabletForBackup, *concurrency)
}

// execBackup is shared by Backup and BackupShard
func execBackup(ctx context.Context, wr *wrangler.Wrangler, tablet *topodatapb.Tablet, concurrency int) error {
stream, err := wr.TabletManagerClient().Backup(ctx, tablet, concurrency)
if err != nil {
return err
}
for {
e, err := stream.Recv()
switch err {
case nil:
logutil.LogEvent(wr.Logger(), e)
case io.EOF:
return nil
default:
return err
}
}
}

func commandListBackups(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
Expand Down
38 changes: 0 additions & 38 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ import (
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"sort"
"strconv"
Expand Down Expand Up @@ -205,9 +204,6 @@ var commands = []commandGroup{
{"Sleep", commandSleep,
"<tablet alias> <duration>",
"Blocks the action queue on the specified tablet for the specified amount of time. This is typically used for testing."},
{"Backup", commandBackup,
"[-concurrency=4] <tablet alias>",
"Stops mysqld and uses the BackupStorage service to store a new backup. This function also remembers if the tablet was replicating so that it can restore the same state after the backup completes."},
{"ExecuteHook", commandExecuteHook,
"<tablet alias> <hook name> [<param1=value1> <param2=value2> ...]",
"Runs the specified hook on the given tablet. A hook is a script that resides in the $VTROOT/vthook directory. You can put any script into that directory and use this command to run that script.\n" +
Expand Down Expand Up @@ -1030,40 +1026,6 @@ func commandSleep(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fla
return wr.TabletManagerClient().Sleep(ctx, ti.Tablet, duration)
}

func commandBackup(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
concurrency := subFlags.Int("concurrency", 4, "Specifies the number of compression/checksum jobs to run simultaneously")
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 1 {
return fmt.Errorf("the Backup command requires the <tablet alias> argument")
}

tabletAlias, err := topoproto.ParseTabletAlias(subFlags.Arg(0))
if err != nil {
return err
}
tabletInfo, err := wr.TopoServer().GetTablet(ctx, tabletAlias)
if err != nil {
return err
}
stream, err := wr.TabletManagerClient().Backup(ctx, tabletInfo.Tablet, *concurrency)
if err != nil {
return err
}
for {
e, err := stream.Recv()
switch err {
case nil:
logutil.LogEvent(wr.Logger(), e)
case io.EOF:
return nil
default:
return err
}
}
}

func commandExecuteFetchAsDba(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
maxRows := subFlags.Int("max_rows", 10000, "Specifies the maximum number of rows to allow in reset")
disableBinlogs := subFlags.Bool("disable_binlogs", false, "Disables writing to binlogs during the query")
Expand Down