Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func backupFiles(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger,
}

// open the MANIFEST
wc, err := bh.AddFile(ctx, backupManifest)
wc, err := bh.AddFile(ctx, backupManifest, 0)
if err != nil {
return fmt.Errorf("cannot add %v to backup: %v", backupManifest, err)
}
Expand Down Expand Up @@ -440,8 +440,13 @@ func backupFile(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger,
}
defer source.Close()

fi, err := source.Stat()
if err != nil {
return err
}

// Open the destination file for writing, and a buffer.
wc, err := bh.AddFile(ctx, name)
wc, err := bh.AddFile(ctx, name, fi.Size())
if err != nil {
return fmt.Errorf("cannot add file: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/backupstorage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type BackupHandle interface {
// multiple go routines once a backup has been started.
// The context is valid for the duration of the writes, until the
// WriteCloser is closed.
AddFile(ctx context.Context, filename string) (io.WriteCloser, error)
AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error)

// EndBackup stops and closes a backup. The contents should be kept.
// Only works for read-write backups (created by StartBackup).
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/cephbackupstorage/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (bh *CephBackupHandle) Name() string {
}

// AddFile implements BackupHandle.
func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) {
func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
if bh.readOnly {
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/filebackupstorage/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (fbh *FileBackupHandle) Name() string {
}

// AddFile is part of the BackupHandle interface
func (fbh *FileBackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) {
func (fbh *FileBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
if fbh.readOnly {
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/mysqlctl/filebackupstorage/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestListBackups(t *testing.T) {
}

// check we cannot chaneg a backup we listed
if _, err := bhs[0].AddFile(ctx, "test"); err == nil {
if _, err := bhs[0].AddFile(ctx, "test", 0); err == nil {
t.Fatalf("was able to AddFile to read-only backup")
}
if err := bhs[0].EndBackup(ctx); err == nil {
Expand All @@ -166,7 +166,7 @@ func TestFileContents(t *testing.T) {
if err != nil {
t.Fatalf("fbs.StartBackup failed: %v", err)
}
wc, err := bh.AddFile(ctx, filename1)
wc, err := bh.AddFile(ctx, filename1, 0)
if err != nil {
t.Fatalf("bh.AddFile failed: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/gcsbackupstorage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (bh *GCSBackupHandle) Name() string {
}

// AddFile implements BackupHandle.
func (bh *GCSBackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) {
func (bh *GCSBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
if bh.readOnly {
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}
Expand Down
18 changes: 16 additions & 2 deletions go/vt/mysqlctl/s3backupstorage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"flag"
"fmt"
"io"
"math"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -80,17 +81,30 @@ func (bh *S3BackupHandle) Name() string {
}

// AddFile is part of the backupstorage.BackupHandle interface.
func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) {
func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
if bh.readOnly {
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}

// Calculate s3 upload part size using the source filesize
partSizeMB := s3manager.DefaultUploadPartSize
if filesize > 0 {
minimumPartSize := float64(filesize) / float64(s3manager.MaxUploadParts)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about dynamically changing the part size without specifying a max limit. The problem with tuning the part size is now you have to be able to fit that in memory -- multiplied by the concurrency. So I think there should be a flag for the max part size and a flag for max upload parts. Using those 2 along with concurrency, someone can tune the memory footprint of their backups

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given a 120gb table, that would be 12mb parts. For that to have any kind of significant impact you would have to set concurrency pretty high.

I didn't think this change would affect anyone, given that tables under 50gb would just be using the same 5mb default part size as before and tables over 50gb currently don't even upload.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd definitely like to hear what others have to say about their existing concurrency settings etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to add, I was also cautious of making a blanket part size flag because then that would create large upload buffers for files that are very small. I really wanted to use a larger part size only for files that needed it and leave the rest to the default size of 5mb.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like MaxUploadParts is not changeable, so this is probably the only course of action. 🤞

// Convert partsize to mb and round up to ensure large enough partsize
calculatedPartSizeMB := int64(math.Ceil(minimumPartSize / 1024 * 1024))
if calculatedPartSizeMB > partSizeMB {
partSizeMB = calculatedPartSizeMB
}
}

reader, writer := io.Pipe()
bh.waitGroup.Add(1)

go func() {
defer bh.waitGroup.Done()
uploader := s3manager.NewUploaderWithClient(bh.client)
uploader := s3manager.NewUploaderWithClient(bh.client, func(u *s3manager.Uploader) {
u.PartSize = partSizeMB
})
object := objName(bh.dir, bh.name, filename)

var sseOption *string
Expand Down