From 6cad756d7b6581b17e0ff74d16dd066d43dff14a Mon Sep 17 00:00:00 2001 From: Harrison McGonigal Date: Tue, 17 Apr 2018 15:14:22 -0400 Subject: [PATCH] dynamically calculate s3 upload partsize for backups Signed-off-by: Harrison McGonigal --- go/vt/mysqlctl/backup.go | 9 +++++++-- go/vt/mysqlctl/backupstorage/interface.go | 2 +- go/vt/mysqlctl/cephbackupstorage/ceph.go | 2 +- go/vt/mysqlctl/filebackupstorage/file.go | 2 +- go/vt/mysqlctl/filebackupstorage/file_test.go | 4 ++-- go/vt/mysqlctl/gcsbackupstorage/gcs.go | 2 +- go/vt/mysqlctl/s3backupstorage/s3.go | 18 ++++++++++++++++-- 7 files changed, 29 insertions(+), 10 deletions(-) diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index ea4e195a445..64eae6ea78a 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -402,7 +402,7 @@ func backupFiles(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, } // open the MANIFEST - wc, err := bh.AddFile(ctx, backupManifest) + wc, err := bh.AddFile(ctx, backupManifest, 0) if err != nil { return fmt.Errorf("cannot add %v to backup: %v", backupManifest, err) } @@ -440,8 +440,13 @@ func backupFile(ctx context.Context, mysqld MysqlDaemon, logger logutil.Logger, } defer source.Close() + fi, err := source.Stat() + if err != nil { + return err + } + // Open the destination file for writing, and a buffer. - wc, err := bh.AddFile(ctx, name) + wc, err := bh.AddFile(ctx, name, fi.Size()) if err != nil { return fmt.Errorf("cannot add file: %v", err) } diff --git a/go/vt/mysqlctl/backupstorage/interface.go b/go/vt/mysqlctl/backupstorage/interface.go index 7d8e55ee936..e6dacb05177 100644 --- a/go/vt/mysqlctl/backupstorage/interface.go +++ b/go/vt/mysqlctl/backupstorage/interface.go @@ -49,7 +49,7 @@ type BackupHandle interface { // multiple go routines once a backup has been started. // The context is valid for the duration of the writes, until the // WriteCloser is closed. - AddFile(ctx context.Context, filename string) (io.WriteCloser, error) + AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) // EndBackup stops and closes a backup. The contents should be kept. // Only works for read-write backups (created by StartBackup). diff --git a/go/vt/mysqlctl/cephbackupstorage/ceph.go b/go/vt/mysqlctl/cephbackupstorage/ceph.go index 8d7d05c5537..5c0b616625e 100644 --- a/go/vt/mysqlctl/cephbackupstorage/ceph.go +++ b/go/vt/mysqlctl/cephbackupstorage/ceph.go @@ -73,7 +73,7 @@ func (bh *CephBackupHandle) Name() string { } // AddFile implements BackupHandle. -func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) { +func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if bh.readOnly { return nil, fmt.Errorf("AddFile cannot be called on read-only backup") } diff --git a/go/vt/mysqlctl/filebackupstorage/file.go b/go/vt/mysqlctl/filebackupstorage/file.go index 52f10707f0b..d2a93ea0ddc 100644 --- a/go/vt/mysqlctl/filebackupstorage/file.go +++ b/go/vt/mysqlctl/filebackupstorage/file.go @@ -56,7 +56,7 @@ func (fbh *FileBackupHandle) Name() string { } // AddFile is part of the BackupHandle interface -func (fbh *FileBackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) { +func (fbh *FileBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if fbh.readOnly { return nil, fmt.Errorf("AddFile cannot be called on read-only backup") } diff --git a/go/vt/mysqlctl/filebackupstorage/file_test.go b/go/vt/mysqlctl/filebackupstorage/file_test.go index 41718e8f7fa..fe23d949fbc 100644 --- a/go/vt/mysqlctl/filebackupstorage/file_test.go +++ b/go/vt/mysqlctl/filebackupstorage/file_test.go @@ -140,7 +140,7 @@ func TestListBackups(t *testing.T) { } // check we cannot chaneg a backup we listed - if _, err := bhs[0].AddFile(ctx, "test"); err == nil { + if _, err := bhs[0].AddFile(ctx, "test", 0); err == nil { t.Fatalf("was able to AddFile to read-only backup") } if err := bhs[0].EndBackup(ctx); err == nil { @@ -166,7 +166,7 @@ func TestFileContents(t *testing.T) { if err != nil { t.Fatalf("fbs.StartBackup failed: %v", err) } - wc, err := bh.AddFile(ctx, filename1) + wc, err := bh.AddFile(ctx, filename1, 0) if err != nil { t.Fatalf("bh.AddFile failed: %v", err) } diff --git a/go/vt/mysqlctl/gcsbackupstorage/gcs.go b/go/vt/mysqlctl/gcsbackupstorage/gcs.go index d62a2c21a00..b6fbcfdc69c 100644 --- a/go/vt/mysqlctl/gcsbackupstorage/gcs.go +++ b/go/vt/mysqlctl/gcsbackupstorage/gcs.go @@ -66,7 +66,7 @@ func (bh *GCSBackupHandle) Name() string { } // AddFile implements BackupHandle. -func (bh *GCSBackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) { +func (bh *GCSBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if bh.readOnly { return nil, fmt.Errorf("AddFile cannot be called on read-only backup") } diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index f9436f240a7..03bdd41b72b 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -27,6 +27,7 @@ import ( "flag" "fmt" "io" + "math" "sort" "strings" "sync" @@ -80,17 +81,30 @@ func (bh *S3BackupHandle) Name() string { } // AddFile is part of the backupstorage.BackupHandle interface. -func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string) (io.WriteCloser, error) { +func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if bh.readOnly { return nil, fmt.Errorf("AddFile cannot be called on read-only backup") } + // Calculate s3 upload part size using the source filesize + partSizeMB := s3manager.DefaultUploadPartSize + if filesize > 0 { + minimumPartSize := float64(filesize) / float64(s3manager.MaxUploadParts) + // Convert partsize to mb and round up to ensure large enough partsize + calculatedPartSizeMB := int64(math.Ceil(minimumPartSize / 1024 * 1024)) + if calculatedPartSizeMB > partSizeMB { + partSizeMB = calculatedPartSizeMB + } + } + reader, writer := io.Pipe() bh.waitGroup.Add(1) go func() { defer bh.waitGroup.Done() - uploader := s3manager.NewUploaderWithClient(bh.client) + uploader := s3manager.NewUploaderWithClient(bh.client, func(u *s3manager.Uploader) { + u.PartSize = partSizeMB + }) object := objName(bh.dir, bh.name, filename) var sseOption *string