From 18cdd551006b635370d114cf1a1753bdd120c5b1 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 18 Jul 2023 16:34:37 +0300 Subject: [PATCH 01/14] incremental restore: take a backup manifest's PurgedGTIDs into account Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/backupengine.go | 25 +++++++++++++++ go/vt/mysqlctl/binlogs_gtid.go | 14 +++++++-- go/vt/mysqlctl/binlogs_gtid_test.go | 40 ++++++++++++++++++++++++ go/vt/mysqlctl/builtinbackupengine.go | 44 +++++++++++---------------- 4 files changed, 93 insertions(+), 30 deletions(-) diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index fae7a10d0f5..d2f2fb66d11 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -391,6 +391,31 @@ func FindLatestSuccessfulBackup(ctx context.Context, logger logutil.Logger, bhs return nil, nil, ErrNoCompleteBackup } +// FindLatestSuccessfulBackupPosition returns the position of the last known successful backup +func FindLatestSuccessfulBackupPosition(ctx context.Context, params BackupParams) (pos mysql.Position, err error) { + bs, err := backupstorage.GetBackupStorage() + if err != nil { + return pos, err + } + defer bs.Close() + + // Backups are stored in a directory structure that starts with + // / + backupDir := GetBackupDir(params.Keyspace, params.Shard) + bhs, err := bs.ListBackups(ctx, backupDir) + if err != nil { + return pos, vterrors.Wrap(err, "ListBackups failed") + } + _, manifest, err := FindLatestSuccessfulBackup(ctx, params.Logger, bhs) + if err != nil { + return pos, vterrors.Wrap(err, "FindLatestSuccessfulBackup failed") + } + pos = manifest.Position + // For restore purposes, and for all we care, the backup position should also include the gtid_purged + pos.GTIDSet = pos.GTIDSet.Union(manifest.PurgedPosition.GTIDSet) + return pos, nil +} + // FindBackupToRestore returns a path, a sequence of backup handles, to be restored. // The returned handles stand for valid backups with complete manifests. func FindBackupToRestore(ctx context.Context, params RestoreParams, bhs []backupstorage.BackupHandle) (*RestorePath, error) { diff --git a/go/vt/mysqlctl/binlogs_gtid.go b/go/vt/mysqlctl/binlogs_gtid.go index 70f734b7cae..185aedc89f9 100644 --- a/go/vt/mysqlctl/binlogs_gtid.go +++ b/go/vt/mysqlctl/binlogs_gtid.go @@ -93,13 +93,21 @@ func ChooseBinlogsForIncrementalBackup( // know this when we look into the _next_ binlog file's Previous-GTIDs. continue } + // Got here? This means backupFromGTIDSet does not full contain the current binlog's Previous-GTIDs. + // In other words, Previoud-GTIDs have entries on top of backupFromGTIDSet. Which suggests that these + // entries were added by the previous binary log. if i == 0 { + // Ummm... there _is no_ previous binary log. return nil, "", "", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "Required entries have been purged. Oldest binary log %v expects entries not found in backup pos. Expected pos=%v", binlog, previousGTIDsPos) } - if !prevGTIDsUnion.Union(purgedGTIDSet).Contains(backupFromGTIDSet) { + // The other thing to validate, is that we can't allow a situation where the backup-GTIDs have entries not covered + // by our binary log's Previous-GTIDs (padded with purged GTIDs). Because that means we can't possibly restore to + // such position. + prevGTIDsUnionPurged := prevGTIDsUnion.Union(purgedGTIDSet) + if !prevGTIDsUnionPurged.Contains(backupFromGTIDSet) { return nil, "", "", vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, - "Mismatching GTID entries. Requested backup pos has entries not found in the binary logs, and binary logs have entries not found in the requested backup pos. Neither fully contains the other. Requested pos=%v, binlog pos=%v", - backupFromGTIDSet, previousGTIDsPos.GTIDSet) + "Mismatching GTID entries. Requested backup pos has entries not found in the binary logs, and binary logs have entries not found in the requested backup pos. Neither fully contains the other.\n- Requested pos=%v\n- binlog pos=%v\n- purgedGTIDSet=%v\n- union=%v\n- union purged=%v", + backupFromGTIDSet, previousGTIDsPos.GTIDSet, purgedGTIDSet, prevGTIDsUnion, prevGTIDsUnionPurged) } // We begin with the previous binary log, and we ignore the last binary log, because it's still open and being written to. binaryLogsToBackup = binaryLogs[i-1 : len(binaryLogs)-1] diff --git a/go/vt/mysqlctl/binlogs_gtid_test.go b/go/vt/mysqlctl/binlogs_gtid_test.go index c39047d10ea..8afbf21b529 100644 --- a/go/vt/mysqlctl/binlogs_gtid_test.go +++ b/go/vt/mysqlctl/binlogs_gtid_test.go @@ -111,6 +111,46 @@ func TestChooseBinlogsForIncrementalBackup(t *testing.T) { backupPos: "16b1039f-0000-0000-0000-000000000000:1-63", expectError: "Mismatching GTID entries", }, + { + name: "empty previous GTIDs in first binlog with gap, with good backup pos", + previousGTIDs: map[string]string{ + "vt-bin.000001": "", + "vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-78", + "vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-243", + "vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-331", + }, + backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-78", + expectBinlogs: []string{"vt-bin.000004", "vt-bin.000005"}, + }, + { + name: "empty previous GTIDs in first binlog with gap, and without gtid_purged", + previousGTIDs: map[string]string{ + "vt-bin.000001": "", + "vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-78", + "vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-243", + "vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-331", + }, + backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-78", + expectError: "Mismatching GTID entries", + }, + { + name: "empty previous GTIDs in first binlog but with proper gtid_purged", + previousGTIDs: map[string]string{ + "vt-bin.000001": "", + "vt-bin.000002": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000003": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-60", + "vt-bin.000004": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-78", + "vt-bin.000005": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-243", + "vt-bin.000006": "16b1039f-22b6-11ed-b765-0a43f95f28a3:40-331", + }, + backupPos: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-78", + gtidPurged: "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-40", + expectBinlogs: []string{"vt-bin.000004", "vt-bin.000005"}, + }, { name: "empty previous GTIDs in first binlog covering backup pos", previousGTIDs: map[string]string{ diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 2ecad7a8934..fd66d236add 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -239,6 +239,16 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par return false, vterrors.Wrap(err, "can't get MySQL version") } + if params.IncrementalFromPos == autoIncrementalFromPos { + params.Logger.Infof("auto evaluating incremental_from_pos") + pos, err := FindLatestSuccessfulBackupPosition(ctx, params) + if err != nil { + return false, err + } + params.IncrementalFromPos = mysql.EncodePosition(pos) + params.Logger.Infof("auto evaluated incremental_from_pos: %s", params.IncrementalFromPos) + } + // @@gtid_purged getPurgedGTIDSet := func() (mysql.Position, mysql.Mysql56GTIDSet, error) { gtidPurged, err := params.Mysqld.GetGTIDPurged(ctx) @@ -251,33 +261,6 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par } return gtidPurged, purgedGTIDSet, nil } - gtidPurged, purgedGTIDSet, err := getPurgedGTIDSet() - if err != nil { - return false, err - } - - if params.IncrementalFromPos == autoIncrementalFromPos { - params.Logger.Infof("auto evaluating incremental_from_pos") - bs, err := backupstorage.GetBackupStorage() - if err != nil { - return false, err - } - defer bs.Close() - - // Backups are stored in a directory structure that starts with - // / - backupDir := GetBackupDir(params.Keyspace, params.Shard) - bhs, err := bs.ListBackups(ctx, backupDir) - if err != nil { - return false, vterrors.Wrap(err, "ListBackups failed") - } - _, manifest, err := FindLatestSuccessfulBackup(ctx, params.Logger, bhs) - if err != nil { - return false, vterrors.Wrap(err, "FindLatestSuccessfulBackup failed") - } - params.IncrementalFromPos = mysql.EncodePosition(manifest.Position) - params.Logger.Infof("auto evaluated incremental_from_pos: %s", params.IncrementalFromPos) - } // params.IncrementalFromPos is a string. We want to turn that into a MySQL GTID backupFromGTIDSet, err := getIncrementalFromPosGTIDSet(params.IncrementalFromPos) @@ -300,6 +283,13 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par if err != nil { return false, vterrors.Wrapf(err, "cannot get binary logs in incremental backup") } + // gtid_purged is important information. The restore flow uses this info to to complement binary logs' Previous-GTIDs. + // It is important to only get gtid_purged _after_ we've rotated into the new binary log, because the `FLUSH BINARY LOGS` + // command may also purge old logs, hence affecting the value of gtid_purged. + gtidPurged, purgedGTIDSet, err := getPurgedGTIDSet() + if err != nil { + return false, err + } previousGTIDs := map[string]string{} getBinlogPreviousGTIDs := func(ctx context.Context, binlog string) (gtids string, err error) { gtids, ok := previousGTIDs[binlog] From 99adb69f0b3119ba1fa4e22fd9c883ffc500cc93 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 19 Jul 2023 13:32:09 +0300 Subject: [PATCH 02/14] make the logic clearer Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/builtinbackupengine.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index fd66d236add..9300e9a1c47 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -596,7 +596,8 @@ func (be *BuiltinBackupEngine) backupFiles( return vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) } defer func() { - if closeErr := wc.Close(); finalErr == nil { + closeErr := wc.Close() + if finalErr == nil { finalErr = closeErr } }() From fd9926f2e40cf4bc07a1172b4559c8e59282ca59 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 19 Jul 2023 13:52:08 +0300 Subject: [PATCH 03/14] Add 'FromBackup' field in BackupManifest, populate when using 'auto' incremental backup, to indicate which backup this backup is based on. Also, when looking for last successful backup, skip the partial backup handle that is the very backup we're still running Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/backupengine.go | 26 ++++++++++++++++++-------- go/vt/mysqlctl/builtinbackupengine.go | 10 +++++++--- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index d2f2fb66d11..6bde220e8f0 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -258,6 +258,9 @@ type BackupManifest struct { // which incremental changes are backed up. FromPosition mysql.Position + // FromBackup indicates the backup name on which this incremental backup is based, assumign this is an incremental backup with "auto" pos`` + FromBackup string + // Incremental indicates whether this is an incremental backup Incremental bool @@ -377,9 +380,16 @@ func (p *RestorePath) String() string { // FindLatestSuccessfulBackup returns the handle and manifest for the last good backup, // which can be either full or increment -func FindLatestSuccessfulBackup(ctx context.Context, logger logutil.Logger, bhs []backupstorage.BackupHandle) (backupstorage.BackupHandle, *BackupManifest, error) { +func FindLatestSuccessfulBackup(ctx context.Context, logger logutil.Logger, bhs []backupstorage.BackupHandle, excludeBackupName string) (backupstorage.BackupHandle, *BackupManifest, error) { for index := len(bhs) - 1; index >= 0; index-- { bh := bhs[index] + if bh.Name() == excludeBackupName { + // skip this bh. Use case: in an incremental backup, as we look for previous successful backups, + // the new incremental backup handle is partial: the directory exists, it will show in ListBackups, but + // the MANIFEST file does nto exist yet. So we avoid the errors/warnings associated with reading this partial backup, + // and just skip it. + continue + } // Check that the backup MANIFEST exists and can be successfully decoded. bm, err := GetBackupManifest(ctx, bh) if err != nil { @@ -392,10 +402,10 @@ func FindLatestSuccessfulBackup(ctx context.Context, logger logutil.Logger, bhs } // FindLatestSuccessfulBackupPosition returns the position of the last known successful backup -func FindLatestSuccessfulBackupPosition(ctx context.Context, params BackupParams) (pos mysql.Position, err error) { +func FindLatestSuccessfulBackupPosition(ctx context.Context, params BackupParams, excludeBackupName string) (backupName string, pos mysql.Position, err error) { bs, err := backupstorage.GetBackupStorage() if err != nil { - return pos, err + return "", pos, err } defer bs.Close() @@ -404,16 +414,16 @@ func FindLatestSuccessfulBackupPosition(ctx context.Context, params BackupParams backupDir := GetBackupDir(params.Keyspace, params.Shard) bhs, err := bs.ListBackups(ctx, backupDir) if err != nil { - return pos, vterrors.Wrap(err, "ListBackups failed") + return "", pos, vterrors.Wrap(err, "ListBackups failed") } - _, manifest, err := FindLatestSuccessfulBackup(ctx, params.Logger, bhs) + bh, manifest, err := FindLatestSuccessfulBackup(ctx, params.Logger, bhs, excludeBackupName) if err != nil { - return pos, vterrors.Wrap(err, "FindLatestSuccessfulBackup failed") + return "", pos, vterrors.Wrap(err, "FindLatestSuccessfulBackup failed") } pos = manifest.Position // For restore purposes, and for all we care, the backup position should also include the gtid_purged - pos.GTIDSet = pos.GTIDSet.Union(manifest.PurgedPosition.GTIDSet) - return pos, nil + // pos.GTIDSet = pos.GTIDSet.Union(manifest.PurgedPosition.GTIDSet) + return bh.Name(), pos, nil } // FindBackupToRestore returns a path, a sequence of backup handles, to be restored. diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 9300e9a1c47..b2650548f9f 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -239,12 +239,14 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par return false, vterrors.Wrap(err, "can't get MySQL version") } + var fromBackupName string if params.IncrementalFromPos == autoIncrementalFromPos { params.Logger.Infof("auto evaluating incremental_from_pos") - pos, err := FindLatestSuccessfulBackupPosition(ctx, params) + backupName, pos, err := FindLatestSuccessfulBackupPosition(ctx, params, bh.Name()) if err != nil { return false, err } + fromBackupName = backupName params.IncrementalFromPos = mysql.EncodePosition(pos) params.Logger.Infof("auto evaluated incremental_from_pos: %s", params.IncrementalFromPos) } @@ -325,7 +327,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par // incrementalBackupFromGTID is the "previous GTIDs" of the first binlog file we back up. // It is a fact that incrementalBackupFromGTID is earlier or equal to params.IncrementalFromPos. // In the backup manifest file, we document incrementalBackupFromGTID, not the user's requested position. - if err := be.backupFiles(ctx, params, bh, incrementalBackupToPosition, gtidPurged, incrementalBackupFromPosition, binaryLogsToBackup, serverUUID, mysqlVersion); err != nil { + if err := be.backupFiles(ctx, params, bh, incrementalBackupToPosition, gtidPurged, incrementalBackupFromPosition, fromBackupName, binaryLogsToBackup, serverUUID, mysqlVersion); err != nil { return false, err } return true, nil @@ -436,7 +438,7 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac } // Backup everything, capture the error. - backupErr := be.backupFiles(ctx, params, bh, replicationPosition, gtidPurgedPosition, mysql.Position{}, nil, serverUUID, mysqlVersion) + backupErr := be.backupFiles(ctx, params, bh, replicationPosition, gtidPurgedPosition, mysql.Position{}, "", nil, serverUUID, mysqlVersion) usable := backupErr == nil // Try to restart mysqld, use background context in case we timed out the original context @@ -519,6 +521,7 @@ func (be *BuiltinBackupEngine) backupFiles( replicationPosition mysql.Position, purgedPosition mysql.Position, fromPosition mysql.Position, + fromBackupName string, binlogFiles []string, serverUUID string, mysqlVersion string, @@ -610,6 +613,7 @@ func (be *BuiltinBackupEngine) backupFiles( Position: replicationPosition, PurgedPosition: purgedPosition, FromPosition: fromPosition, + FromBackup: fromBackupName, Incremental: !fromPosition.IsZero(), ServerUUID: serverUUID, TabletAlias: params.TabletAlias, From 9a8f17baf05880064c7ddff9ecfe247c2674d123 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 19 Jul 2023 13:53:13 +0300 Subject: [PATCH 04/14] include purged GTIDs in FindLatestSuccessfulBackupPosition Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/backupengine.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index 6bde220e8f0..01e899e95b7 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -422,7 +422,7 @@ func FindLatestSuccessfulBackupPosition(ctx context.Context, params BackupParams } pos = manifest.Position // For restore purposes, and for all we care, the backup position should also include the gtid_purged - // pos.GTIDSet = pos.GTIDSet.Union(manifest.PurgedPosition.GTIDSet) + pos.GTIDSet = pos.GTIDSet.Union(manifest.PurgedPosition.GTIDSet) return bh.Name(), pos, nil } From 0421eef16bfd08902162fc6412989781cc7d2411 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 19 Jul 2023 17:49:03 +0300 Subject: [PATCH 05/14] In incremental backup, union the manifest's Position with gtid_purged Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/backupengine.go | 2 -- go/vt/mysqlctl/builtinbackupengine.go | 12 ++++++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index 01e899e95b7..93d3bd285c3 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -421,8 +421,6 @@ func FindLatestSuccessfulBackupPosition(ctx context.Context, params BackupParams return "", pos, vterrors.Wrap(err, "FindLatestSuccessfulBackup failed") } pos = manifest.Position - // For restore purposes, and for all we care, the backup position should also include the gtid_purged - pos.GTIDSet = pos.GTIDSet.Union(manifest.PurgedPosition.GTIDSet) return bh.Name(), pos, nil } diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index b2650548f9f..1c050606277 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -318,6 +318,14 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par if err != nil { return false, vterrors.Wrapf(err, "cannot parse position %v", incrementalBackupToGTID) } + // The backup position is the GTISset of the last binary log (taken from Previous-GTIDs of the one-next binary log), and we + // also include gtid_purged ; this complies with the "standard" way MySQL "thinks" about GTIDs: there's gtid_executed, which includes + // everything that's ever been applied, and a subset of that is gtid_purged, which are the event no longer available in binary logs. + // When we consider Vitess incremental backups, what's important for us is "what's the GTIDSet that's true when this backup was taken, + // and which will be true when we restore this backup". The answer to this is the GTIDSet that includes the purged GTIDs. + // It's also nice for icnremental backups that are taken on _other_ tablets, so that they don't need to understand what exactly was purged + // on _this_ tablet. They don't care, all they want to know is "what GTIDSet can we get from this". + incrementalBackupToPosition.GTIDSet = incrementalBackupToPosition.GTIDSet.Union(gtidPurged.GTIDSet) // It's worthwhile we explain the difference between params.IncrementalFromPos and incrementalBackupFromPosition. // params.IncrementalFromPos is supplied by the user. They want an incremental backup that covers that position. // However, we implement incremental backups by copying complete binlog files. That position could potentially @@ -518,7 +526,7 @@ func (be *BuiltinBackupEngine) backupFiles( ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, - replicationPosition mysql.Position, + backupPosition mysql.Position, purgedPosition mysql.Position, fromPosition mysql.Position, fromBackupName string, @@ -610,7 +618,7 @@ func (be *BuiltinBackupEngine) backupFiles( // Common base fields BackupManifest: BackupManifest{ BackupMethod: builtinBackupEngineName, - Position: replicationPosition, + Position: backupPosition, PurgedPosition: purgedPosition, FromPosition: fromPosition, FromBackup: fromBackupName, From 0bfc28617a0b3520f80b0c85ac36fa447ae3eecb Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 19 Jul 2023 17:59:06 +0300 Subject: [PATCH 06/14] PITR endtoend tests: add test for interleaved backups on different replicas. Simplify some backup_utils functionality, and support backup/restore for either one of two replicas Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../endtoend/backup/pitr/backup_pitr_test.go | 10 + .../backup/vtctlbackup/backup_utils.go | 117 ++++++----- .../backup/vtctlbackup/pitr_test_framework.go | 183 ++++++++++++++++-- 3 files changed, 251 insertions(+), 59 deletions(-) diff --git a/go/test/endtoend/backup/pitr/backup_pitr_test.go b/go/test/endtoend/backup/pitr/backup_pitr_test.go index d7f76b012e5..d9131fbb95c 100644 --- a/go/test/endtoend/backup/pitr/backup_pitr_test.go +++ b/go/test/endtoend/backup/pitr/backup_pitr_test.go @@ -31,3 +31,13 @@ func TestIncrementalBackupAndRestoreToPos(t *testing.T) { } backup.ExecTestIncrementalBackupAndRestoreToPos(t, tcase) } + +// TestIncrementalBackupOnTwoTablets +func TestIncrementalBackupOnTwoTablets(t *testing.T) { + tcase := &backup.PITRTestCase{ + Name: "BuiltinBackup", + SetupType: backup.BuiltinBackup, + ComprssDetails: nil, + } + backup.ExecTestIncrementalBackupOnTwoTablets(t, tcase) +} diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 4b5bf897dda..21f1554fdf6 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -81,11 +81,12 @@ var ( } vtInsertTest = ` - create table vt_insert_test ( - id bigint auto_increment, - msg varchar(64), - primary key (id) - ) Engine=InnoDB` + create table vt_insert_test ( + id bigint auto_increment, + msg varchar(64), + primary key (id) + ) Engine=InnoDB + ` ) type CompressionDetails struct { @@ -162,11 +163,13 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp commonTabletArg = append(commonTabletArg, getCompressorArgs(cDetails)...) var mysqlProcs []*exec.Cmd + tabletTypes := map[int]string{ + 0: "primary", + 1: "replica", + 2: "rdonly", + } for i := 0; i < 3; i++ { - tabletType := "replica" - if i == 0 { - tabletType = "primary" - } + tabletType := tabletTypes[i] tablet := localCluster.NewVttabletInstance(tabletType, 0, cell) tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName) tablet.VttabletProcess.DbPassword = dbPassword @@ -219,13 +222,16 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp if err := localCluster.VtctlclientProcess.InitTablet(replica1, cell, keyspaceName, hostname, shard.Name); err != nil { return 1, err } + if err := localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shard.Name); err != nil { + return 1, err + } vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", localCluster.VtctldProcess.GrpcPort, localCluster.TmpDirectory) _, err = vtctldClientProcess.ExecuteCommandWithOutput("SetKeyspaceDurabilityPolicy", keyspaceName, "--durability-policy=semi_sync") if err != nil { return 1, err } - for _, tablet := range []cluster.Vttablet{*primary, *replica1} { + for _, tablet := range []*cluster.Vttablet{primary, replica1, replica2} { if err := tablet.VttabletProcess.Setup(); err != nil { return 1, err } @@ -1068,40 +1074,30 @@ func terminateRestore(t *testing.T) { assert.True(t, found, "Restore message not found") } -func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, tabletType string) (backups []string, destroy func(t *testing.T)) { - restoreWaitForBackup(t, tabletType, nil, true) - verifyInitialReplication(t) - - err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) - require.Nil(t, err) - - backups = localCluster.VerifyBackupCount(t, shardKsName, 1) +func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backups []string) { + replica := getReplica(t, replicaIndex) - verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) - - err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second) + err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica.Alias) require.Nil(t, err) - err = replica2.VttabletProcess.TearDown() + backups, err = localCluster.ListBackups(shardKsName) require.Nil(t, err) - err = localCluster.VtctlclientProcess.ExecuteCommand("DeleteTablet", replica2.Alias) - require.Nil(t, err) + verifyTabletBackupStats(t, replica.VttabletProcess.GetVars()) - destroy = func(t *testing.T) { - verifyAfterRemovingBackupNoBackupShouldBePresent(t, backups) - } - return backups, destroy + return backups } -func GetReplicaPosition(t *testing.T) string { - pos, _ := cluster.GetPrimaryPosition(t, *replica1, hostname) +func GetReplicaPosition(t *testing.T, replicaIndex int) string { + replica := getReplica(t, replicaIndex) + pos, _ := cluster.GetPrimaryPosition(t, *replica, hostname) return pos } -func GetReplicaGtidPurged(t *testing.T) string { +func GetReplicaGtidPurged(t *testing.T, replicaIndex int) string { + replica := getReplica(t, replicaIndex) query := "select @@global.gtid_purged as gtid_purged" - rs, err := replica1.VttabletProcess.QueryTablet(query, keyspaceName, true) + rs, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true) require.NoError(t, err) row := rs.Named().Row() require.NotNil(t, row) @@ -1134,13 +1130,26 @@ func ReadRowsFromPrimary(t *testing.T) (msgs []string) { return ReadRowsFromTablet(t, primary) } -func ReadRowsFromReplica(t *testing.T) (msgs []string) { - return ReadRowsFromTablet(t, replica1) +func getReplica(t *testing.T, replicaIndex int) *cluster.Vttablet { + switch replicaIndex { + case 0: + return replica1 + case 1: + return replica2 + default: + assert.Failf(t, "invalid replica index", "index=%d", replicaIndex) + return nil + } +} + +func ReadRowsFromReplica(t *testing.T, replicaIndex int) (msgs []string) { + return ReadRowsFromTablet(t, getReplica(t, replicaIndex)) } func readManifestFile(t *testing.T, backupLocation string) (manifest *mysqlctl.BackupManifest) { // reading manifest - data, err := os.ReadFile(backupLocation + "/MANIFEST") + fullPath := backupLocation + "/MANIFEST" + data, err := os.ReadFile(fullPath) require.NoErrorf(t, err, "error while reading MANIFEST %v", err) // parsing manifest @@ -1150,19 +1159,19 @@ func readManifestFile(t *testing.T, backupLocation string) (manifest *mysqlctl.B return manifest } -func TestReplicaFullBackup(t *testing.T) (manifest *mysqlctl.BackupManifest, destroy func(t *testing.T)) { - backups, destroy := vtctlBackupReplicaNoDestroyNoWrites(t, "replica") +func TestReplicaFullBackup(t *testing.T, replicaIndex int) (manifest *mysqlctl.BackupManifest) { + backups := vtctlBackupReplicaNoDestroyNoWrites(t, replicaIndex) backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backups[len(backups)-1] - return readManifestFile(t, backupLocation), destroy + return readManifestFile(t, backupLocation) } -func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos mysql.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) { +func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos mysql.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) { incrementalFromPosArg := "auto" if !incrementalFromPos.IsZero() { incrementalFromPosArg = mysql.EncodePosition(incrementalFromPos) } - output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("Backup", "--", "--incremental_from_pos", incrementalFromPosArg, replica1.Alias) + output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("Backup", "--", "--incremental_from_pos", incrementalFromPosArg, replica.Alias) if expectError != "" { require.Errorf(t, err, "expected: %v", expectError) require.Contains(t, output, expectError) @@ -1172,23 +1181,43 @@ func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos mysql.Positio backups, err := localCluster.ListBackups(shardKsName) require.NoError(t, err) - verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) + verifyTabletBackupStats(t, replica.VttabletProcess.GetVars()) backupName = backups[len(backups)-1] backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName return readManifestFile(t, backupLocation), backupName } -func TestReplicaRestoreToPos(t *testing.T, restoreToPos mysql.Position, expectError string) { +func TestReplicaIncrementalBackup(t *testing.T, replicaIndex int, incrementalFromPos mysql.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) { + replica := getReplica(t, replicaIndex) + return testReplicaIncrementalBackup(t, replica, incrementalFromPos, expectError) +} + +func TestReplicaFullRestore(t *testing.T, replicaIndex int, expectError string) { + replica := getReplica(t, replicaIndex) + + output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", replica.Alias) + if expectError != "" { + require.Errorf(t, err, "expected: %v", expectError) + require.Contains(t, output, expectError) + return + } + require.NoErrorf(t, err, "output: %v", output) + verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars()) +} + +func TestReplicaRestoreToPos(t *testing.T, replicaIndex int, restoreToPos mysql.Position, expectError string) { + replica := getReplica(t, replicaIndex) + require.False(t, restoreToPos.IsZero()) restoreToPosArg := mysql.EncodePosition(restoreToPos) - output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--", "--restore_to_pos", restoreToPosArg, replica1.Alias) + output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--", "--restore_to_pos", restoreToPosArg, replica.Alias) if expectError != "" { require.Errorf(t, err, "expected: %v", expectError) require.Contains(t, output, expectError) return } require.NoErrorf(t, err, "output: %v", output) - verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars()) + verifyTabletRestoreStats(t, replica.VttabletProcess.GetVars()) } func verifyTabletBackupStats(t *testing.T, vars map[string]any) { diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index a5bc6d15b0f..a697a5b23b5 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -30,18 +30,28 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" ) +const ( + postWriteSleepDuration = 2*time.Second + 100*time.Millisecond +) + +const ( + operationFullBackup = iota + operationIncrementalBackup + operationRestore +) + type PITRTestCase struct { Name string SetupType int ComprssDetails *CompressionDetails } -func waitForReplica(t *testing.T) { +func waitForReplica(t *testing.T, replicaIndex int) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() pMsgs := ReadRowsFromPrimary(t) for { - rMsgs := ReadRowsFromReplica(t) + rMsgs := ReadRowsFromReplica(t, replicaIndex) if len(pMsgs) == len(rMsgs) { // success return @@ -56,7 +66,9 @@ func waitForReplica(t *testing.T) { } } -// ExecTestIncrementalBackupAndRestoreToPos +// ExecTestIncrementalBackupAndRestoreToPos runs a series of backups: a full backup and multiple incremental backups. +// in between, it makes writes to the database, and takes notes: what data was available in what backup. +// It then restores each and every one of those backups, in random order, and expects to find the specific data associated with the backup. func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) { defer cluster.PanicHandler(t) @@ -72,8 +84,8 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) backupPositions := []string{} recordRowsPerPosition := func(t *testing.T) { - pos := GetReplicaPosition(t) - msgs := ReadRowsFromReplica(t) + pos := GetReplicaPosition(t, 0) + msgs := ReadRowsFromReplica(t, 0) if _, ok := rowsPerPosition[pos]; !ok { backupPositions = append(backupPositions, pos) rowsPerPosition[pos] = len(msgs) @@ -83,13 +95,13 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) var fullBackupPos mysql.Position t.Run("full backup", func(t *testing.T) { InsertRowOnPrimary(t, "before-full-backup") - waitForReplica(t) + waitForReplica(t, 0) - manifest, _ := TestReplicaFullBackup(t) + manifest := TestReplicaFullBackup(t, 0) fullBackupPos = manifest.Position require.False(t, fullBackupPos.IsZero()) // - msgs := ReadRowsFromReplica(t) + msgs := ReadRowsFromReplica(t, 0) pos := mysql.EncodePosition(fullBackupPos) backupPositions = append(backupPositions, pos) rowsPerPosition[pos] = len(msgs) @@ -150,8 +162,8 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) // in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this // is only ever a problem in this end-to-end test, not in production. // Also, we gie the replica a chance to catch up. - time.Sleep(1100 * time.Millisecond) - waitForReplica(t) + time.Sleep(postWriteSleepDuration) + waitForReplica(t, 0) recordRowsPerPosition(t) // configure --incremental-from-pos to either: // - auto @@ -164,7 +176,8 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) incrementalFromPos = fullBackupPos } } - manifest, backupName := TestReplicaIncrementalBackup(t, incrementalFromPos, tc.expectError) + // always use same 1st replica + manifest, backupName := TestReplicaIncrementalBackup(t, 0, incrementalFromPos, tc.expectError) if tc.expectError != "" { return } @@ -178,11 +191,11 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) require.NotEqual(t, manifest.Position, manifest.FromPosition) require.True(t, manifest.Position.GTIDSet.Union(manifest.PurgedPosition.GTIDSet).Contains(manifest.FromPosition.GTIDSet)) - gtidPurgedPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, GetReplicaGtidPurged(t)) + gtidPurgedPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, GetReplicaGtidPurged(t, 0)) require.NoError(t, err) fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet) - expectFromPosition := lastBackupPos.GTIDSet.Union(gtidPurgedPos.GTIDSet) + expectFromPosition := lastBackupPos.GTIDSet if !incrementalFromPos.IsZero() { expectFromPosition = incrementalFromPos.GTIDSet.Union(gtidPurgedPos.GTIDSet) } @@ -197,8 +210,8 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) t.Run(testName, func(t *testing.T) { restoreToPos, err := mysql.DecodePosition(pos) require.NoError(t, err) - TestReplicaRestoreToPos(t, restoreToPos, "") - msgs := ReadRowsFromReplica(t) + TestReplicaRestoreToPos(t, 0, restoreToPos, "") + msgs := ReadRowsFromReplica(t, 0) count, ok := rowsPerPosition[pos] require.True(t, ok) assert.Equalf(t, count, len(msgs), "messages: %v", msgs) @@ -219,3 +232,143 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) }) }) } + +// ExecTestIncrementalBackupAndRestoreToPos runs a series of interleaved backups on two different replicas: full and incremental. +// Specifically, it's designed to test how incremental backups are taken by interleaved replicas, so that they successfully build on +// one another. +func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) { + defer cluster.PanicHandler(t) + + t.Run(tcase.Name, func(t *testing.T) { + // setup cluster for the testing + code, err := LaunchCluster(tcase.SetupType, "xbstream", 0, tcase.ComprssDetails) + require.NoError(t, err, "setup failed with status code %d", code) + defer TearDownCluster() + + InitTestTable(t) + + rowsPerPosition := map[string]int{} + + recordRowsPerPosition := func(t *testing.T, replicaIndex int) { + pos := GetReplicaPosition(t, replicaIndex) + msgs := ReadRowsFromReplica(t, replicaIndex) + if _, ok := rowsPerPosition[pos]; !ok { + rowsPerPosition[pos] = len(msgs) + } + } + + var lastBackupPos mysql.Position + InsertRowOnPrimary(t, "before-incremental-backups") + waitForReplica(t, 0) + waitForReplica(t, 1) + + tt := []struct { + name string + operationType int + replicaIndex int + expectError string + }{ + { + name: "full1", + operationType: operationFullBackup, + }, + { + name: "incremental1", + operationType: operationIncrementalBackup, + }, + { + name: "restore1", + operationType: operationRestore, + }, + { + name: "incremental2", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, + { + name: "full2", + operationType: operationFullBackup, + replicaIndex: 1, + }, + { + name: "incremental2-after-full2", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, + { + name: "restore2", + operationType: operationRestore, + replicaIndex: 1, + }, + { + name: "incremental-replica1", + operationType: operationIncrementalBackup, + }, + { + name: "incremental-replica2", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, + { + name: "incremental-replica1", + operationType: operationIncrementalBackup, + }, + { + name: "incremental-replica2", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, + } + insertRowAndWait := func(t *testing.T, replicaIndex int, data string) { + t.Run("insert row and wait", func(t *testing.T) { + InsertRowOnPrimary(t, data) + time.Sleep(postWriteSleepDuration) + waitForReplica(t, replicaIndex) + recordRowsPerPosition(t, replicaIndex) + }) + } + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + insertRowAndWait(t, tc.replicaIndex, tc.name) + t.Run("running operation", func(t *testing.T) { + switch tc.operationType { + case operationFullBackup: + manifest := TestReplicaFullBackup(t, tc.replicaIndex) + fullBackupPos := manifest.Position + require.False(t, fullBackupPos.IsZero()) + // + msgs := ReadRowsFromReplica(t, tc.replicaIndex) + pos := mysql.EncodePosition(fullBackupPos) + rowsPerPosition[pos] = len(msgs) + + lastBackupPos = fullBackupPos + case operationIncrementalBackup: + var incrementalFromPos mysql.Position // keep zero, we will use "auto" + manifest, _ := TestReplicaIncrementalBackup(t, tc.replicaIndex, incrementalFromPos, tc.expectError) + if tc.expectError != "" { + return + } + defer func() { + lastBackupPos = manifest.Position + }() + require.False(t, manifest.FromPosition.IsZero()) + require.NotEqual(t, manifest.Position, manifest.FromPosition) + require.True(t, manifest.Position.GTIDSet.Union(manifest.PurgedPosition.GTIDSet).Contains(manifest.FromPosition.GTIDSet)) + + gtidPurgedPos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, GetReplicaGtidPurged(t, tc.replicaIndex)) + require.NoError(t, err) + fromPositionIncludingPurged := manifest.FromPosition.GTIDSet.Union(gtidPurgedPos.GTIDSet) + + require.True(t, lastBackupPos.GTIDSet.Contains(fromPositionIncludingPurged), "expected: %v to contain %v", lastBackupPos.GTIDSet, fromPositionIncludingPurged) + case operationRestore: + TestReplicaFullRestore(t, tc.replicaIndex, "") + // should return into replication stream + insertRowAndWait(t, tc.replicaIndex, "post-restore-check") + default: + require.FailNowf(t, "unknown operation type", "operation: %d", tc.operationType) + } + }) + }) + } + }) +} From bf9f3977fe9f7420d2e62a8dd13825bc457674c6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 25 Jul 2023 12:15:46 +0300 Subject: [PATCH 07/14] simplify post write sleep duration Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index a697a5b23b5..aea01dfc4c8 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -31,7 +31,7 @@ import ( ) const ( - postWriteSleepDuration = 2*time.Second + 100*time.Millisecond + postWriteSleepDuration = 2 * time.Second // Nice for debugging purposes: clearly distinguishes the timestamps of certain operations, and as results the names/timestamps of backups. ) const ( From 5cb0561950411d76b3ae886c0aa890d7bc1c4d08 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 26 Jul 2023 11:41:18 +0300 Subject: [PATCH 08/14] kill mysqlbinlog on early break Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/mysqld.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/go/vt/mysqlctl/mysqld.go b/go/vt/mysqlctl/mysqld.go index 1f4bc7036d8..e809684fdc3 100644 --- a/go/vt/mysqlctl/mysqld.go +++ b/go/vt/mysqlctl/mysqld.go @@ -1363,6 +1363,7 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc } scanner := bufio.NewScanner(pipe) scanComplete := make(chan error) + intentionalKill := false scan := func() { defer close(scanComplete) // Read line by line and process it @@ -1379,6 +1380,10 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc matchFound = true } if found && stopAtFirst { + // Found the first timestamp and it's all we need. We won't scan any further and so we should also + // kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe). + intentionalKill = true + mysqlbinlogCmd.Process.Kill() return } } @@ -1387,7 +1392,7 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc return matchedTime, false, err } go scan() - if err := mysqlbinlogCmd.Wait(); err != nil { + if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill { return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps") } if err := <-scanComplete; err != nil { From 2315db17175d855d55a10c8bd80c73233415958b Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 26 Jul 2023 11:53:08 +0300 Subject: [PATCH 09/14] backup utils: flush binary logs functionality; flush and purge binary logs functionality Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../backup/vtctlbackup/backup_utils.go | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 6c4a349b357..25442d90fd4 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -1146,6 +1146,42 @@ func ReadRowsFromReplica(t *testing.T, replicaIndex int) (msgs []string) { return ReadRowsFromTablet(t, getReplica(t, replicaIndex)) } +// FlushBinaryLogsOnReplica issues `FLUSH BINARY LOGS` times +func FlushBinaryLogsOnReplica(t *testing.T, replicaIndex int, count int) { + replica := getReplica(t, replicaIndex) + query := "flush binary logs" + for i := 0; i < count; i++ { + _, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + } +} + +// FlushAndPurgeBinaryLogsOnReplica intentionally loses all existing binary logs. It flushes into a new binary log +// and immediately purges all previous logs. +// This is used to lose information. +func FlushAndPurgeBinaryLogsOnReplica(t *testing.T, replicaIndex int) (lastBinlog string) { + FlushBinaryLogsOnReplica(t, replicaIndex, 1) + + replica := getReplica(t, replicaIndex) + { + query := "show binary logs" + rs, err := replica.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + require.NotEmpty(t, rs.Rows) + for _, row := range rs.Rows { + // binlog file name is first column + lastBinlog = row[0].ToString() + } + } + { + query, err := sqlparser.ParseAndBind("purge binary logs to %a", sqltypes.StringBindVariable(lastBinlog)) + require.NoError(t, err) + _, err = replica.VttabletProcess.QueryTablet(query, keyspaceName, true) + require.NoError(t, err) + } + return lastBinlog +} + func readManifestFile(t *testing.T, backupLocation string) (manifest *mysqlctl.BackupManifest) { // reading manifest fullPath := backupLocation + "/MANIFEST" From 3fa39dffaa9bc92229dea2db71f2e5ac205d19e5 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 26 Jul 2023 12:02:26 +0300 Subject: [PATCH 10/14] ExecTestIncrementalBackupOnTwoTablets: add further tests sequence that validate incremental backups fail when binlog data is purged, but that the situation is salvaged once a full backup runs. ExecTestIncrementalBackupAndRestoreToPos: run a random number (0, 1, 2) number of FLUSH LOGS, just to show the number of binary logs, some of which may be empty, works Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../backup/vtctlbackup/pitr_test_framework.go | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index 231169c9c79..6ef5e3ed83b 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -43,6 +43,7 @@ const ( operationFullBackup = iota operationIncrementalBackup operationRestore + operationFlushAndPurge ) type PITRTestCase struct { @@ -173,6 +174,8 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase) // is only ever a problem in this end-to-end test, not in production. // Also, we gie the replica a chance to catch up. time.Sleep(postWriteSleepDuration) + // randomly flush binary logs 0, 1 or 2 times + FlushBinaryLogsOnReplica(t, 0, rand.Intn(3)) waitForReplica(t, 0) recordRowsPerPosition(t) // configure --incremental-from-pos to either: @@ -472,6 +475,7 @@ func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) { replicaIndex int expectError string }{ + // The following tests run sequentially and build on top of previous results { name: "full1", operationType: operationFullBackup, @@ -485,6 +489,7 @@ func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) { operationType: operationRestore, }, { + // Shows you can take an incremental restore when full & incremental backups were only ever executed on a different replica name: "incremental2", operationType: operationIncrementalBackup, replicaIndex: 1, @@ -495,6 +500,7 @@ func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) { replicaIndex: 1, }, { + // This incremental backup will use full2 as the base backup name: "incremental2-after-full2", operationType: operationIncrementalBackup, replicaIndex: 1, @@ -504,6 +510,7 @@ func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) { operationType: operationRestore, replicaIndex: 1, }, + // Begin a series of interleaved incremental backups { name: "incremental-replica1", operationType: operationIncrementalBackup, @@ -522,6 +529,43 @@ func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) { operationType: operationIncrementalBackup, replicaIndex: 1, }, + // Done interleaved backups. + { + // Lose binary log data + name: "flush and purge 1", + operationType: operationFlushAndPurge, + replicaIndex: 0, + }, + { + // Fail to run incremental backup due to lost data + name: "incremental-replica1 failure", + operationType: operationIncrementalBackup, + expectError: "Required entries have been purged", + }, + { + // Lose binary log data + name: "flush and purge 2", + operationType: operationFlushAndPurge, + replicaIndex: 1, + }, + { + // Fail to run incremental backup due to lost data + name: "incremental-replica2 failure", + operationType: operationIncrementalBackup, + replicaIndex: 1, + expectError: "Required entries have been purged", + }, + { + // Since we've lost binlog data, incremental backups are no longer possible. The situation can be salvaged by running a full backup + name: "full1 after purge", + operationType: operationFullBackup, + }, + { + // Show that replica2 incremental backup is able to work based on the above full backup + name: "incremental-replica2 after purge and backup", + operationType: operationIncrementalBackup, + replicaIndex: 1, + }, } insertRowAndWait := func(t *testing.T, replicaIndex int, data string) { t.Run("insert row and wait", func(t *testing.T) { @@ -536,6 +580,8 @@ func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) { insertRowAndWait(t, tc.replicaIndex, tc.name) t.Run("running operation", func(t *testing.T) { switch tc.operationType { + case operationFlushAndPurge: + FlushAndPurgeBinaryLogsOnReplica(t, tc.replicaIndex) case operationFullBackup: manifest := TestReplicaFullBackup(t, tc.replicaIndex) fullBackupPos := manifest.Position From 0411c5049e28d858c99711b2d1a0b7fa95711e8d Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 8 Aug 2023 12:01:55 +0300 Subject: [PATCH 11/14] switching to vtctldclient Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/backup/vtctlbackup/backup_utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 804dea8018d..7ff9d6b860f 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -1208,7 +1208,7 @@ func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incre if !incrementalFromPos.IsZero() { incrementalFromPosArg = replication.EncodePosition(incrementalFromPos) } - output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental_from_pos", incrementalFromPosArg, replica.Alias) + output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", "--incremental-from-pos", incrementalFromPosArg, replica.Alias) if expectError != "" { require.Errorf(t, err, "expected: %v", expectError) require.Contains(t, output, expectError) From ff46ab57436702ac802734b631e093c9e5ddf2b6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 8 Aug 2023 14:14:03 +0300 Subject: [PATCH 12/14] fix function comment Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go index 5706501c716..ddeb43c7dd7 100644 --- a/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go +++ b/go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go @@ -441,7 +441,7 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes }) } -// ExecTestIncrementalBackupAndRestoreToPos runs a series of interleaved backups on two different replicas: full and incremental. +// ExecTestIncrementalBackupOnTwoTablets runs a series of interleaved backups on two different replicas: full and incremental. // Specifically, it's designed to test how incremental backups are taken by interleaved replicas, so that they successfully build on // one another. func ExecTestIncrementalBackupOnTwoTablets(t *testing.T, tcase *PITRTestCase) { From c1644edc1e858b90ef5e91c2370c0da02738a6f4 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 8 Aug 2023 14:14:47 +0300 Subject: [PATCH 13/14] typo Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/cmd/vtctldclient/command/schema.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/cmd/vtctldclient/command/schema.go b/go/cmd/vtctldclient/command/schema.go index 280463b2792..2ebe6bbb917 100644 --- a/go/cmd/vtctldclient/command/schema.go +++ b/go/cmd/vtctldclient/command/schema.go @@ -298,7 +298,7 @@ func init() { ApplySchema.Flags().StringVar(&applySchemaOptions.CallerID, "caller-id", "", "Effective caller ID used for the operation and should map to an ACL name which grants this identity the necessary permissions to perform the operation (this is only necessary when strict table ACLs are used).") ApplySchema.Flags().StringArrayVar(&applySchemaOptions.SQL, "sql", nil, "Semicolon-delimited, repeatable SQL commands to apply. Exactly one of --sql|--sql-file is required.") ApplySchema.Flags().StringVar(&applySchemaOptions.SQLFile, "sql-file", "", "Path to a file containing semicolon-delimited SQL commands to apply. Exactly one of --sql|--sql-file is required.") - ApplySchema.Flags().Int64Var(&applySchemaOptions.BatchSize, "batch-size", 0, "How many queries to batch together. Only applicabel when all queries are CREATE TABLE|VIEW") + ApplySchema.Flags().Int64Var(&applySchemaOptions.BatchSize, "batch-size", 0, "How many queries to batch together. Only applicable when all queries are CREATE TABLE|VIEW") Root.AddCommand(ApplySchema) From 10722f0eccdd236a0ee45395f3c95643bc78fa29 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 8 Aug 2023 14:16:01 +0300 Subject: [PATCH 14/14] function comment Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/test/endtoend/backup/pitr/backup_pitr_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/backup/pitr/backup_pitr_test.go b/go/test/endtoend/backup/pitr/backup_pitr_test.go index 59c105e94e2..a1b29ef47dd 100644 --- a/go/test/endtoend/backup/pitr/backup_pitr_test.go +++ b/go/test/endtoend/backup/pitr/backup_pitr_test.go @@ -56,7 +56,9 @@ func TestIncrementalBackupAndRestoreToTimestamp(t *testing.T) { backup.ExecTestIncrementalBackupAndRestoreToTimestamp(t, tcase) } -// TestIncrementalBackupOnTwoTablets +// TestIncrementalBackupOnTwoTablets runs a series of interleaved backups on two different replicas: full and incremental. +// Specifically, it's designed to test how incremental backups are taken by interleaved replicas, so that they successfully build on +// one another. func TestIncrementalBackupOnTwoTablets(t *testing.T) { tcase := &backup.PITRTestCase{ Name: "BuiltinBackup",