Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go/vt/mysqlctl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestFindFilesToBackup(t *testing.T) {
DataDir: dataDir,
}

result, err := findFilesToBackup(cnf)
result, totalSize, err := findFilesToBackup(cnf)
if err != nil {
t.Fatalf("findFilesToBackup failed: %v", err)
}
Expand Down Expand Up @@ -112,6 +112,9 @@ func TestFindFilesToBackup(t *testing.T) {
if !reflect.DeepEqual(result, expected) {
t.Fatalf("got wrong list of FileEntry %v, expected %v", result, expected)
}
if totalSize <= 0 {
t.Fatalf("backup size should be > 0, got %v", totalSize)
}
}

type forTest []FileEntry
Expand Down
119 changes: 119 additions & 0 deletions go/vt/mysqlctl/backupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"strings"
"time"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -291,3 +294,119 @@ func RestoreWasInterrupted(cnf *Mycnf) bool {
func GetBackupDir(keyspace, shard string) string {
return fmt.Sprintf("%v/%v", keyspace, shard)
}

// isDbDir returns true if the given directory contains a DB
func isDbDir(p string) bool {
// db.opt is there
if _, err := os.Stat(path.Join(p, "db.opt")); err == nil {
return true
}

// Look for at least one database file
fis, err := ioutil.ReadDir(p)
if err != nil {
return false
}
for _, fi := range fis {
if strings.HasSuffix(fi.Name(), ".frm") {
return true
}

// the MyRocks engine stores data in RocksDB .sst files
// https://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format
if strings.HasSuffix(fi.Name(), ".sst") {
return true
}

// .frm files were removed in MySQL 8, so we need to check for two other file types
// https://dev.mysql.com/doc/refman/8.0/en/data-dictionary-file-removal.html
if strings.HasSuffix(fi.Name(), ".ibd") {
return true
}
// https://dev.mysql.com/doc/refman/8.0/en/serialized-dictionary-information.html
if strings.HasSuffix(fi.Name(), ".sdi") {
return true
}
}

return false
}

func addDirectory(fes []FileEntry, base string, baseDir string, subDir string) ([]FileEntry, int64, error) {
p := path.Join(baseDir, subDir)
var size int64

fis, err := ioutil.ReadDir(p)
if err != nil {
return nil, 0, err
}
for _, fi := range fis {
fes = append(fes, FileEntry{
Base: base,
Name: path.Join(subDir, fi.Name()),
})
size = size + fi.Size()
}
return fes, size, nil
}

// addMySQL8DataDictionary checks to see if the new data dictionary introduced in MySQL 8 exists
// and adds it to the backup manifest if it does
// https://dev.mysql.com/doc/refman/8.0/en/data-dictionary-transactional-storage.html
func addMySQL8DataDictionary(fes []FileEntry, base string, baseDir string) ([]FileEntry, int64, error) {
filePath := path.Join(baseDir, dataDictionaryFile)

// no-op if this file doesn't exist
fi, err := os.Stat(filePath)
if os.IsNotExist(err) {
return fes, 0, nil
}

fes = append(fes, FileEntry{
Base: base,
Name: dataDictionaryFile,
})

return fes, fi.Size(), nil
}

func findFilesToBackup(cnf *Mycnf) ([]FileEntry, int64, error) {
var err error
var result []FileEntry
var totalSize int64

// first add inno db files
result, totalSize, err = addDirectory(result, backupInnodbDataHomeDir, cnf.InnodbDataHomeDir, "")
if err != nil {
return nil, 0, err
}
result, size, err := addDirectory(result, backupInnodbLogGroupHomeDir, cnf.InnodbLogGroupHomeDir, "")
if err != nil {
return nil, 0, err
}
totalSize = totalSize + size
// then add the transactional data dictionary if it exists
result, size, err = addMySQL8DataDictionary(result, backupData, cnf.DataDir)
if err != nil {
return nil, 0, err
}
totalSize = totalSize + size

// then add DB directories
fis, err := ioutil.ReadDir(cnf.DataDir)
if err != nil {
return nil, 0, err
}

for _, fi := range fis {
p := path.Join(cnf.DataDir, fi.Name())
if isDbDir(p) {
result, size, err = addDirectory(result, backupData, cnf.DataDir, fi.Name())
if err != nil {
return nil, 0, err
}
totalSize = totalSize + size
}
}
return result, totalSize, nil
}
2 changes: 2 additions & 0 deletions go/vt/mysqlctl/backupstorage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type BackupHandle interface {
// multiple go routines once a backup has been started.
// The context is valid for the duration of the writes, until the
// WriteCloser is closed.
// filesize should not be treated as an exact value but rather
// as an approximate value
AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error)

// EndBackup stops and closes a backup. The contents should be kept.
Expand Down
115 changes: 2 additions & 113 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -126,116 +124,6 @@ func (fe *FileEntry) open(cnf *Mycnf, readOnly bool) (*os.File, error) {
return fd, nil
}

// isDbDir returns true if the given directory contains a DB
func isDbDir(p string) bool {
// db.opt is there
if _, err := os.Stat(path.Join(p, "db.opt")); err == nil {
return true
}

// Look for at least one database file
fis, err := ioutil.ReadDir(p)
if err != nil {
return false
}
for _, fi := range fis {
if strings.HasSuffix(fi.Name(), ".frm") {
return true
}

// the MyRocks engine stores data in RocksDB .sst files
// https://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format
if strings.HasSuffix(fi.Name(), ".sst") {
return true
}

// .frm files were removed in MySQL 8, so we need to check for two other file types
// https://dev.mysql.com/doc/refman/8.0/en/data-dictionary-file-removal.html
if strings.HasSuffix(fi.Name(), ".ibd") {
return true
}
// https://dev.mysql.com/doc/refman/8.0/en/serialized-dictionary-information.html
if strings.HasSuffix(fi.Name(), ".sdi") {
return true
}
}

return false
}

func addDirectory(fes []FileEntry, base string, baseDir string, subDir string) ([]FileEntry, error) {
p := path.Join(baseDir, subDir)

fis, err := ioutil.ReadDir(p)
if err != nil {
return nil, err
}
for _, fi := range fis {
fes = append(fes, FileEntry{
Base: base,
Name: path.Join(subDir, fi.Name()),
})
}
return fes, nil
}

// addMySQL8DataDictionary checks to see if the new data dictionary introduced in MySQL 8 exists
// and adds it to the backup manifest if it does
// https://dev.mysql.com/doc/refman/8.0/en/data-dictionary-transactional-storage.html
func addMySQL8DataDictionary(fes []FileEntry, base string, baseDir string) ([]FileEntry, error) {
filePath := path.Join(baseDir, dataDictionaryFile)

// no-op if this file doesn't exist
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return fes, nil
}

fes = append(fes, FileEntry{
Base: base,
Name: dataDictionaryFile,
})

return fes, nil
}

func findFilesToBackup(cnf *Mycnf) ([]FileEntry, error) {
var err error
var result []FileEntry

// first add inno db files
result, err = addDirectory(result, backupInnodbDataHomeDir, cnf.InnodbDataHomeDir, "")
if err != nil {
return nil, err
}
result, err = addDirectory(result, backupInnodbLogGroupHomeDir, cnf.InnodbLogGroupHomeDir, "")
if err != nil {
return nil, err
}

// then add the transactional data dictionary if it exists
result, err = addMySQL8DataDictionary(result, backupData, cnf.DataDir)
if err != nil {
return nil, err
}

// then add DB directories
fis, err := ioutil.ReadDir(cnf.DataDir)
if err != nil {
return nil, err
}

for _, fi := range fis {
p := path.Join(cnf.DataDir, fi.Name())
if isDbDir(p) {
result, err = addDirectory(result, backupData, cnf.DataDir, fi.Name())
if err != nil {
return nil, err
}
}
}
return result, nil
}

// ExecuteBackup returns a boolean that indicates if the backup is usable,
// and an overall error.
func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (bool, error) {
Expand Down Expand Up @@ -379,7 +267,8 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, params BackupP
func (be *BuiltinBackupEngine) backupFiles(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, replicationPosition mysql.Position) (finalErr error) {

// Get the files to backup.
fes, err := findFilesToBackup(params.Cnf)
// We don't care about totalSize because we add each file separately.
fes, _, err := findFilesToBackup(params.Cnf)
if err != nil {
return vterrors.Wrap(err, "can't find files to backup")
}
Expand Down
24 changes: 18 additions & 6 deletions go/vt/mysqlctl/xtrabackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (be *XtrabackupEngine) backupFiles(ctx context.Context, params BackupParams
// a timeout on the final Close() step.
addFilesCtx, cancelAddFiles := context.WithCancel(ctx)
defer cancelAddFiles()
destFiles, err := addStripeFiles(addFilesCtx, bh, backupFileName, numStripes, params.Logger)
destFiles, err := addStripeFiles(addFilesCtx, params, bh, backupFileName, numStripes)
if err != nil {
return replicationPosition, vterrors.Wrapf(err, "cannot create backup file %v", backupFileName)
}
Expand Down Expand Up @@ -647,23 +647,35 @@ func stripeFileName(baseFileName string, index int) string {
return fmt.Sprintf("%s-%03d", baseFileName, index)
}

func addStripeFiles(ctx context.Context, backupHandle backupstorage.BackupHandle, baseFileName string, numStripes int, logger logutil.Logger) ([]io.WriteCloser, error) {
func addStripeFiles(ctx context.Context, params BackupParams, backupHandle backupstorage.BackupHandle, baseFileName string, numStripes int) ([]io.WriteCloser, error) {
// Compute total size of all files we will backup.
// We delegate the actual backing up to xtrabackup which streams
// the files as a single archive (tar / xbstream), which might
// further be compressed using gzip.
// This approximate total size is passed in to AddFile so that
// storage plugins can make appropriate choices for parameters
// like partSize in multi-part uploads
_, totalSize, err := findFilesToBackup(params.Cnf)
if err != nil {
return nil, err
}

if numStripes <= 1 {
// No striping.
file, err := backupHandle.AddFile(ctx, baseFileName, 0)
file, err := backupHandle.AddFile(ctx, baseFileName, totalSize)
return []io.WriteCloser{file}, err
}

files := []io.WriteCloser{}
for i := 0; i < numStripes; i++ {
filename := stripeFileName(baseFileName, i)
logger.Infof("Opening backup stripe file %v", filename)
file, err := backupHandle.AddFile(ctx, filename, 0)
params.Logger.Infof("Opening backup stripe file %v", filename)
file, err := backupHandle.AddFile(ctx, filename, totalSize/int64(numStripes))
if err != nil {
// Close any files we already opened and clear them from the result.
for _, file := range files {
if err := file.Close(); err != nil {
logger.Warningf("error closing backup stripe file: %v", err)
params.Logger.Warningf("error closing backup stripe file: %v", err)
}
}
return nil, err
Expand Down