From 245d0dd857b20c4d49226848c6f5203c6eae6a36 Mon Sep 17 00:00:00 2001 From: Ignacio Corderi Date: Thu, 17 Nov 2022 19:20:36 -0300 Subject: [PATCH 1/3] store: move catchpoint sql code into store --- ledger/accountdb.go | 198 ++------------------------- ledger/accountdb_test.go | 28 ++-- ledger/catchpointtracker.go | 67 +++++---- ledger/catchpointtracker_test.go | 50 ++++--- ledger/catchupaccessor.go | 58 ++++---- ledger/msgp_gen.go | 54 -------- ledger/store/catchpoint.go | 228 +++++++++++++++++++++++++++++++ ledger/store/interface.go | 31 ++++- ledger/store/msgp_gen.go | 54 ++++++++ ledger/trackerdb.go | 14 +- 10 files changed, 445 insertions(+), 337 deletions(-) create mode 100644 ledger/store/catchpoint.go diff --git a/ledger/accountdb.go b/ledger/accountdb.go index 5ba566bfa2..840ab5e52f 100644 --- a/ledger/accountdb.go +++ b/ledger/accountdb.go @@ -239,35 +239,32 @@ type compactOnlineAccountDeltas struct { misses []int } -// catchpointState is used to store catchpoint related variables into the catchpointstate table. -type catchpointState string - const ( // catchpointStateLastCatchpoint is written by a node once a catchpoint label is created for a round - catchpointStateLastCatchpoint = catchpointState("lastCatchpoint") + catchpointStateLastCatchpoint = store.CatchpointState("lastCatchpoint") // This state variable is set to 1 if catchpoint's first stage is unfinished, // and is 0 otherwise. Used to clear / restart the first stage after a crash. // This key is set in the same db transaction as the account updates, so the // unfinished first stage corresponds to the current db round. - catchpointStateWritingFirstStageInfo = catchpointState("writingFirstStageInfo") + catchpointStateWritingFirstStageInfo = store.CatchpointState("writingFirstStageInfo") // If there is an unfinished catchpoint, this state variable is set to // the catchpoint's round. Otherwise, it is set to 0. // DEPRECATED. - catchpointStateWritingCatchpoint = catchpointState("writingCatchpoint") + catchpointStateWritingCatchpoint = store.CatchpointState("writingCatchpoint") // catchpointCatchupState is the state of the catchup process. The variable is stored only during the catchpoint catchup process, and removed afterward. - catchpointStateCatchupState = catchpointState("catchpointCatchupState") + catchpointStateCatchupState = store.CatchpointState("catchpointCatchupState") // catchpointStateCatchupLabel is the label to which the currently catchpoint catchup process is trying to catchup to. - catchpointStateCatchupLabel = catchpointState("catchpointCatchupLabel") + catchpointStateCatchupLabel = store.CatchpointState("catchpointCatchupLabel") // catchpointCatchupBlockRound is the block round that is associated with the current running catchpoint catchup. - catchpointStateCatchupBlockRound = catchpointState("catchpointCatchupBlockRound") + catchpointStateCatchupBlockRound = store.CatchpointState("catchpointCatchupBlockRound") // catchpointStateCatchupBalancesRound is the balance round that is associated with the current running catchpoint catchup. Typically it would be // equal to catchpointStateCatchupBlockRound - 320. - catchpointStateCatchupBalancesRound = catchpointState("catchpointCatchupBalancesRound") + catchpointStateCatchupBalancesRound = store.CatchpointState("catchpointCatchupBalancesRound") // catchpointStateCatchupHashRound is the round that is associated with the hash of the merkle trie. Normally, it's identical to catchpointStateCatchupBalancesRound, // however, it could differ when we catchup from a catchpoint that was created using a different version : in this case, // we set it to zero in order to reset the merkle trie. This would force the merkle trie to be re-build on startup ( if needed ). - catchpointStateCatchupHashRound = catchpointState("catchpointCatchupHashRound") - catchpointStateCatchpointLookback = catchpointState("catchpointLookback") + catchpointStateCatchupHashRound = store.CatchpointState("catchpointCatchupHashRound") + catchpointStateCatchpointLookback = store.CatchpointState("catchpointLookback") ) // MaxEncodedBaseAccountDataSize is a rough estimate for the worst-case scenario we're going to have of the base account data serialized. @@ -1145,11 +1142,6 @@ func applyCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, balancesRou return } -func getCatchpoint(ctx context.Context, q db.Queryable, round basics.Round) (fileName string, catchpoint string, fileSize int64, err error) { - err = q.QueryRowContext(ctx, "SELECT filename, catchpoint, filesize FROM storedcatchpoints WHERE round=?", int64(round)).Scan(&fileName, &catchpoint, &fileSize) - return -} - // accountsInit fills the database using tx with initAccounts if the // database has not been initialized yet. // @@ -1916,122 +1908,6 @@ func accountsHashRound(ctx context.Context, tx *sql.Tx) (hashrnd basics.Round, e return } -func storeCatchpoint(ctx context.Context, e db.Executable, round basics.Round, fileName string, catchpoint string, fileSize int64) (err error) { - err = db.Retry(func() (err error) { - query := "DELETE FROM storedcatchpoints WHERE round=?" - _, err = e.ExecContext(ctx, query, round) - if err != nil || (fileName == "" && catchpoint == "" && fileSize == 0) { - return err - } - - query = "INSERT INTO storedcatchpoints(round, filename, catchpoint, filesize, pinned) VALUES(?, ?, ?, ?, 0)" - _, err = e.ExecContext(ctx, query, round, fileName, catchpoint, fileSize) - return err - }) - return -} - -func getOldestCatchpointFiles(ctx context.Context, q db.Queryable, fileCount int, filesToKeep int) (fileNames map[basics.Round]string, err error) { - err = db.Retry(func() (err error) { - query := "SELECT round, filename FROM storedcatchpoints WHERE pinned = 0 and round <= COALESCE((SELECT round FROM storedcatchpoints WHERE pinned = 0 ORDER BY round DESC LIMIT ?, 1),0) ORDER BY round ASC LIMIT ?" - rows, err := q.QueryContext(ctx, query, filesToKeep, fileCount) - if err != nil { - return err - } - defer rows.Close() - - fileNames = make(map[basics.Round]string) - for rows.Next() { - var fileName string - var round basics.Round - err = rows.Scan(&round, &fileName) - if err != nil { - return err - } - fileNames[round] = fileName - } - - return rows.Err() - }) - if err != nil { - fileNames = nil - } - return -} - -func readCatchpointStateUint64(ctx context.Context, q db.Queryable, stateName catchpointState) (val uint64, err error) { - err = db.Retry(func() (err error) { - query := "SELECT intval FROM catchpointstate WHERE id=?" - var v sql.NullInt64 - err = q.QueryRowContext(ctx, query, stateName).Scan(&v) - if err == sql.ErrNoRows { - return nil - } - if err != nil { - return err - } - if v.Valid { - val = uint64(v.Int64) - } - return nil - }) - return val, err -} - -func writeCatchpointStateUint64(ctx context.Context, e db.Executable, stateName catchpointState, setValue uint64) (err error) { - err = db.Retry(func() (err error) { - if setValue == 0 { - return deleteCatchpointStateImpl(ctx, e, stateName) - } - - // we don't know if there is an entry in the table for this state, so we'll insert/replace it just in case. - query := "INSERT OR REPLACE INTO catchpointstate(id, intval) VALUES(?, ?)" - _, err = e.ExecContext(ctx, query, stateName, setValue) - return err - }) - return err -} - -func readCatchpointStateString(ctx context.Context, q db.Queryable, stateName catchpointState) (val string, err error) { - err = db.Retry(func() (err error) { - query := "SELECT strval FROM catchpointstate WHERE id=?" - var v sql.NullString - err = q.QueryRowContext(ctx, query, stateName).Scan(&v) - if err == sql.ErrNoRows { - return nil - } - if err != nil { - return err - } - - if v.Valid { - val = v.String - } - return nil - }) - return val, err -} - -func writeCatchpointStateString(ctx context.Context, e db.Executable, stateName catchpointState, setValue string) (err error) { - err = db.Retry(func() (err error) { - if setValue == "" { - return deleteCatchpointStateImpl(ctx, e, stateName) - } - - // we don't know if there is an entry in the table for this state, so we'll insert/replace it just in case. - query := "INSERT OR REPLACE INTO catchpointstate(id, strval) VALUES(?, ?)" - _, err = e.ExecContext(ctx, query, stateName, setValue) - return err - }) - return err -} - -func deleteCatchpointStateImpl(ctx context.Context, e db.Executable, stateName catchpointState) error { - query := "DELETE FROM catchpointstate WHERE id=?" - _, err := e.ExecContext(ctx, query, stateName) - return err -} - // accountsOnlineTop returns the top n online accounts starting at position offset // (that is, the top offset'th account through the top offset+n-1'th account). // @@ -3859,59 +3735,3 @@ func deleteOldCatchpointFirstStageInfo(ctx context.Context, e db.Executable, max } return db.Retry(f) } - -func insertUnfinishedCatchpoint(ctx context.Context, e db.Executable, round basics.Round, blockHash crypto.Digest) error { - f := func() error { - query := "INSERT INTO unfinishedcatchpoints(round, blockhash) VALUES(?, ?)" - _, err := e.ExecContext(ctx, query, round, blockHash[:]) - return err - } - return db.Retry(f) -} - -type unfinishedCatchpointRecord struct { - round basics.Round - blockHash crypto.Digest -} - -func selectUnfinishedCatchpoints(ctx context.Context, q db.Queryable) ([]unfinishedCatchpointRecord, error) { - var res []unfinishedCatchpointRecord - - f := func() error { - query := "SELECT round, blockhash FROM unfinishedcatchpoints ORDER BY round" - rows, err := q.QueryContext(ctx, query) - if err != nil { - return err - } - - // Clear `res` in case this function is repeated. - res = res[:0] - for rows.Next() { - var record unfinishedCatchpointRecord - var blockHash []byte - err = rows.Scan(&record.round, &blockHash) - if err != nil { - return err - } - copy(record.blockHash[:], blockHash) - res = append(res, record) - } - - return nil - } - err := db.Retry(f) - if err != nil { - return nil, err - } - - return res, nil -} - -func deleteUnfinishedCatchpoint(ctx context.Context, e db.Executable, round basics.Round) error { - f := func() error { - query := "DELETE FROM unfinishedcatchpoints WHERE round = ?" - _, err := e.ExecContext(ctx, query, round) - return err - } - return db.Retry(f) -} diff --git a/ledger/accountdb_test.go b/ledger/accountdb_test.go index 5daa191eb2..6b2098e3a9 100644 --- a/ledger/accountdb_test.go +++ b/ledger/accountdb_test.go @@ -3173,43 +3173,45 @@ func TestUnfinishedCatchpointsTable(t *testing.T) { dbs, _ := dbOpenTest(t, true) defer dbs.Close() + cts := store.NewCatchpointSQLReaderWriter(dbs.Wdb.Handle) + err := accountsCreateUnfinishedCatchpointsTable( context.Background(), dbs.Wdb.Handle) require.NoError(t, err) var d3 crypto.Digest rand.Read(d3[:]) - err = insertUnfinishedCatchpoint(context.Background(), dbs.Wdb.Handle, 3, d3) + err = cts.InsertUnfinishedCatchpoint(context.Background(), 3, d3) require.NoError(t, err) var d5 crypto.Digest rand.Read(d5[:]) - err = insertUnfinishedCatchpoint(context.Background(), dbs.Wdb.Handle, 5, d5) + err = cts.InsertUnfinishedCatchpoint(context.Background(), 5, d5) require.NoError(t, err) - ret, err := selectUnfinishedCatchpoints(context.Background(), dbs.Rdb.Handle) + ret, err := cts.SelectUnfinishedCatchpoints(context.Background()) require.NoError(t, err) - expected := []unfinishedCatchpointRecord{ + expected := []store.UnfinishedCatchpointRecord{ { - round: 3, - blockHash: d3, + Round: 3, + BlockHash: d3, }, { - round: 5, - blockHash: d5, + Round: 5, + BlockHash: d5, }, } require.Equal(t, expected, ret) - err = deleteUnfinishedCatchpoint(context.Background(), dbs.Wdb.Handle, 3) + err = cts.DeleteUnfinishedCatchpoint(context.Background(), 3) require.NoError(t, err) - ret, err = selectUnfinishedCatchpoints(context.Background(), dbs.Rdb.Handle) + ret, err = cts.SelectUnfinishedCatchpoints(context.Background()) require.NoError(t, err) - expected = []unfinishedCatchpointRecord{ + expected = []store.UnfinishedCatchpointRecord{ { - round: 5, - blockHash: d5, + Round: 5, + BlockHash: d5, }, } require.Equal(t, expected, ret) diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index d890584332..e3fae8eaa2 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -97,6 +97,11 @@ func catchpointStage1Decoder(r io.Reader) (io.ReadCloser, error) { return snappyReadCloser{snappy.NewReader(r)}, nil } +type catchpointStore interface { + store.CatchpointWriter + store.CatchpointReader +} + type catchpointTracker struct { // dbDirectory is the directory where the ledger and block sql file resides as well as the parent directory for the catchup files to be generated dbDirectory string @@ -118,7 +123,8 @@ type catchpointTracker struct { log logging.Logger // Connection to the database. - dbs db.Pair + dbs db.Pair + catchpointStore catchpointStore // The last catchpoint label that was written to the database. Should always align with what's in the database. // note that this is the last catchpoint *label* and not the catchpoint file. @@ -231,13 +237,14 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic } f := func(ctx context.Context, tx *sql.Tx) error { + cps := store.NewCatchpointSQLReaderWriter(tx) err := ct.recordFirstStageInfo(ctx, tx, dbRound, totalKVs, totalAccounts, totalChunks, biggestChunkLen) if err != nil { return err } // Clear the db record. - return writeCatchpointStateUint64(ctx, tx, catchpointStateWritingFirstStageInfo, 0) + return cps.WriteCatchpointStateUint64(ctx, catchpointStateWritingFirstStageInfo, 0) } return ct.dbs.Wdb.Atomic(f) } @@ -245,8 +252,8 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic // Possibly finish generating first stage catchpoint db record and data file after // a crash. func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round) error { - v, err := readCatchpointStateUint64( - context.Background(), ct.dbs.Rdb.Handle, catchpointStateWritingFirstStageInfo) + v, err := ct.catchpointStore.ReadCatchpointStateUint64( + context.Background(), catchpointStateWritingFirstStageInfo) if err != nil { return err } @@ -267,7 +274,7 @@ func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round) er } func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint64) error { - records, err := selectUnfinishedCatchpoints(context.Background(), ct.dbs.Rdb.Handle) + records, err := ct.catchpointStore.SelectUnfinishedCatchpoints(context.Background()) if err != nil { return err } @@ -276,14 +283,14 @@ func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint // First, delete the unfinished catchpoint file. relCatchpointFilePath := filepath.Join( CatchpointDirName, - makeCatchpointFilePath(basics.Round(record.round))) + makeCatchpointFilePath(basics.Round(record.Round))) err = removeSingleCatchpointFileFromDisk(ct.dbDirectory, relCatchpointFilePath) if err != nil { return err } err = ct.finishCatchpoint( - context.Background(), record.round, record.blockHash, catchpointLookback) + context.Background(), record.Round, record.BlockHash, catchpointLookback) if err != nil { return err } @@ -300,8 +307,8 @@ func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error { ctx := context.Background() - catchpointLookback, err := readCatchpointStateUint64( - ctx, ct.dbs.Rdb.Handle, catchpointStateCatchpointLookback) + catchpointLookback, err := ct.catchpointStore.ReadCatchpointStateUint64( + ctx, catchpointStateCatchpointLookback) if err != nil { return err } @@ -332,6 +339,7 @@ func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error { func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Round) (err error) { ct.log = l.trackerLog() ct.dbs = l.trackerDB() + ct.catchpointStore = store.NewCatchpointSQLReaderWriter(l.trackerDB().Wdb.Handle) ct.roundDigest = nil ct.catchpointDataWriting = 0 @@ -351,8 +359,8 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou return } - ct.lastCatchpointLabel, err = readCatchpointStateString( - context.Background(), ct.dbs.Rdb.Handle, catchpointStateLastCatchpoint) + ct.lastCatchpointLabel, err = ct.catchpointStore.ReadCatchpointStateString( + context.Background(), catchpointStateLastCatchpoint) if err != nil { return } @@ -509,6 +517,8 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx *sql.Tx, dcc *d } }() + cps := store.NewCatchpointSQLReaderWriter(tx) + if ct.catchpointEnabled() { var mc *MerkleCommitter mc, err = MakeMerkleCommitter(tx, false) @@ -550,21 +560,19 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx *sql.Tx, dcc *d } if dcc.catchpointFirstStage { - err = writeCatchpointStateUint64(ctx, tx, catchpointStateWritingFirstStageInfo, 1) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateWritingFirstStageInfo, 1) if err != nil { return err } } - err = writeCatchpointStateUint64( - ctx, tx, catchpointStateCatchpointLookback, dcc.catchpointLookback) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchpointLookback, dcc.catchpointLookback) if err != nil { return err } for _, round := range ct.calculateCatchpointRounds(dcc) { - err = insertUnfinishedCatchpoint( - ctx, tx, round, dcc.committedRoundDigests[round-dcc.oldBase-1]) + err = cps.InsertUnfinishedCatchpoint(ctx, round, dcc.committedRoundDigests[round-dcc.oldBase-1]) if err != nil { return err } @@ -729,8 +737,8 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound "creating catchpoint round: %d accountsRound: %d label: %s", round, accountsRound, label) - err := writeCatchpointStateString( - ctx, ct.dbs.Wdb.Handle, catchpointStateLastCatchpoint, label) + err := ct.catchpointStore.WriteCatchpointStateString( + ctx, catchpointStateLastCatchpoint, label) if err != nil { return err } @@ -789,11 +797,12 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound } err = ct.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + cps := store.NewCatchpointSQLReaderWriter(tx) err = ct.recordCatchpointFile(ctx, tx, round, relCatchpointFilePath, fileInfo.Size()) if err != nil { return err } - return deleteUnfinishedCatchpoint(ctx, tx, round) + return cps.DeleteUnfinishedCatchpoint(ctx, round) }) if err != nil { return err @@ -824,7 +833,7 @@ func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics. } if !exists { - return deleteUnfinishedCatchpoint(ctx, ct.dbs.Wdb.Handle, round) + return ct.catchpointStore.DeleteUnfinishedCatchpoint(ctx, round) } return ct.createCatchpoint(ctx, accountsRound, round, dataInfo, blockHash) } @@ -1236,7 +1245,7 @@ func makeCatchpointFilePath(round basics.Round) string { // database and storage realign. func (ct *catchpointTracker) recordCatchpointFile(ctx context.Context, e db.Executable, round basics.Round, relCatchpointFilePath string, fileSize int64) (err error) { if ct.catchpointFileHistoryLength != 0 { - err = storeCatchpoint(ctx, e, round, relCatchpointFilePath, "", fileSize) + err = ct.catchpointStore.StoreCatchpoint(ctx, round, relCatchpointFilePath, "", fileSize) if err != nil { ct.log.Warnf("catchpointTracker.recordCatchpointFile() unable to save catchpoint: %v", err) return @@ -1251,8 +1260,11 @@ func (ct *catchpointTracker) recordCatchpointFile(ctx context.Context, e db.Exec if ct.catchpointFileHistoryLength == -1 { return } + + cps := store.NewCatchpointSQLReaderWriter(e) + var filesToDelete map[basics.Round]string - filesToDelete, err = getOldestCatchpointFiles(ctx, e, 2, ct.catchpointFileHistoryLength) + filesToDelete, err = cps.GetOldestCatchpointFiles(ctx, 2, ct.catchpointFileHistoryLength) if err != nil { return fmt.Errorf("unable to delete catchpoint file, getOldestCatchpointFiles failed : %v", err) } @@ -1261,7 +1273,7 @@ func (ct *catchpointTracker) recordCatchpointFile(ctx context.Context, e db.Exec if err != nil { return err } - err = storeCatchpoint(ctx, e, round, "", "", 0) + err = cps.StoreCatchpoint(ctx, round, "", "", 0) if err != nil { return fmt.Errorf("unable to delete old catchpoint entry '%s' : %v", fileToDelete, err) } @@ -1275,8 +1287,10 @@ func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseS fileSize := int64(0) start := time.Now() ledgerGetcatchpointCount.Inc(nil) + // TODO: we need to generalize this, check @cce PoC PR, he has something + // somewhat broken for some KVs.. err := ct.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - dbFileName, _, fileSize, err = getCatchpoint(ctx, tx, round) + dbFileName, _, fileSize, err = ct.catchpointStore.GetCatchpoint(ctx, round) return }) ledgerGetcatchpointMicros.AddMicrosecondsSince(start, nil) @@ -1334,9 +1348,10 @@ func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseS // deleteStoredCatchpoints iterates over the storedcatchpoints table and deletes all the files stored on disk. // once all the files have been deleted, it would go ahead and remove the entries from the table. func deleteStoredCatchpoints(ctx context.Context, e db.Executable, dbDirectory string) (err error) { + cps := store.NewCatchpointSQLReaderWriter(e) catchpointsFilesChunkSize := 50 for { - fileNames, err := getOldestCatchpointFiles(ctx, e, catchpointsFilesChunkSize, 0) + fileNames, err := cps.GetOldestCatchpointFiles(ctx, catchpointsFilesChunkSize, 0) if err != nil { return err } @@ -1350,7 +1365,7 @@ func deleteStoredCatchpoints(ctx context.Context, e db.Executable, dbDirectory s return err } // clear the entry from the database - err = storeCatchpoint(ctx, e, round, "", "", 0) + err = cps.StoreCatchpoint(ctx, round, "", "", 0) if err != nil { return err } diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index ef636dcf94..19c9b02a39 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -107,7 +107,7 @@ func TestGetCatchpointStream(t *testing.T) { require.NoError(t, err) // Store the catchpoint into the database - err := storeCatchpoint(context.Background(), ml.dbs.Wdb.Handle, basics.Round(i), fileName, "", int64(len(data))) + err := ct.catchpointStore.StoreCatchpoint(context.Background(), basics.Round(i), fileName, "", int64(len(data))) require.NoError(t, err) } @@ -134,7 +134,7 @@ func TestGetCatchpointStream(t *testing.T) { require.Nil(t, reader) // File on disk, but database lost the record - err = storeCatchpoint(context.Background(), ml.dbs.Wdb.Handle, basics.Round(3), "", "", 0) + err = ct.catchpointStore.StoreCatchpoint(context.Background(), basics.Round(3), "", "", 0) require.NoError(t, err) reader, err = ct.GetCatchpointStream(basics.Round(3)) require.NoError(t, err) @@ -185,7 +185,7 @@ func TestAcctUpdatesDeleteStoredCatchpoints(t *testing.T) { require.NoError(t, err) err = f.Close() require.NoError(t, err) - err = storeCatchpoint(context.Background(), ml.dbs.Wdb.Handle, basics.Round(i), file, "", 0) + err = ct.catchpointStore.StoreCatchpoint(context.Background(), basics.Round(i), file, "", 0) require.NoError(t, err) } @@ -197,7 +197,7 @@ func TestAcctUpdatesDeleteStoredCatchpoints(t *testing.T) { _, err := os.Open(file) require.True(t, os.IsNotExist(err)) } - fileNames, err := getOldestCatchpointFiles(context.Background(), ml.dbs.Rdb.Handle, dummyCatchpointFilesToCreate, 0) + fileNames, err := ct.catchpointStore.GetOldestCatchpointFiles(context.Background(), dummyCatchpointFilesToCreate, 0) require.NoError(t, err) require.Equal(t, 0, len(fileNames)) } @@ -1001,9 +1001,11 @@ func TestFirstStagePersistence(t *testing.T) { defer ml2.Close() ml.Close() + cps2 := store.NewCatchpointSQLReaderWriter(ml2.dbs.Wdb.Handle) + // Insert unfinished first stage record. - err = writeCatchpointStateUint64( - context.Background(), ml2.dbs.Wdb.Handle, catchpointStateWritingFirstStageInfo, 1) + err = cps2.WriteCatchpointStateUint64( + context.Background(), catchpointStateWritingFirstStageInfo, 1) require.NoError(t, err) // Delete the database record. @@ -1028,8 +1030,8 @@ func TestFirstStagePersistence(t *testing.T) { require.True(t, exists) // Check that the unfinished first stage record is deleted. - v, err := readCatchpointStateUint64( - context.Background(), ml2.dbs.Rdb.Handle, catchpointStateWritingFirstStageInfo) + v, err := ct2.catchpointStore.ReadCatchpointStateUint64( + context.Background(), catchpointStateWritingFirstStageInfo) require.NoError(t, err) require.Zero(t, v) } @@ -1131,19 +1133,21 @@ func TestSecondStagePersistence(t *testing.T) { err = os.WriteFile(catchpointDataFilePath, catchpointData, 0644) require.NoError(t, err) + cps2 := store.NewCatchpointSQLReaderWriter(ml2.dbs.Wdb.Handle) + // Restore the first stage database record. err = insertOrReplaceCatchpointFirstStageInfo( context.Background(), ml2.dbs.Wdb.Handle, firstStageRound, &firstStageInfo) require.NoError(t, err) // Insert unfinished catchpoint record. - err = insertUnfinishedCatchpoint( - context.Background(), ml2.dbs.Wdb.Handle, secondStageRound, crypto.Digest{}) + err = cps2.InsertUnfinishedCatchpoint( + context.Background(), secondStageRound, crypto.Digest{}) require.NoError(t, err) // Delete the catchpoint file database record. - err = storeCatchpoint( - context.Background(), ml2.dbs.Wdb.Handle, secondStageRound, "", "", 0) + err = cps2.StoreCatchpoint( + context.Background(), secondStageRound, "", "", 0) require.NoError(t, err) // Create a catchpoint tracker and let it restart catchpoint's second stage. @@ -1157,14 +1161,14 @@ func TestSecondStagePersistence(t *testing.T) { require.Greater(t, info.Size(), int64(1)) // Check that the database record exists. - filename, _, _, err := getCatchpoint( - context.Background(), ml2.dbs.Rdb.Handle, secondStageRound) + filename, _, _, err := ct2.catchpointStore.GetCatchpoint( + context.Background(), secondStageRound) require.NoError(t, err) require.NotEmpty(t, filename) // Check that the unfinished catchpoint database record is deleted. - unfinishedCatchpoints, err := selectUnfinishedCatchpoints( - context.Background(), ml2.dbs.Rdb.Handle) + unfinishedCatchpoints, err := ct2.catchpointStore.SelectUnfinishedCatchpoints( + context.Background()) require.NoError(t, err) require.Empty(t, unfinishedCatchpoints) } @@ -1253,8 +1257,8 @@ func TestSecondStageDeletesUnfinishedCatchpointRecord(t *testing.T) { ml2.trackers.waitAccountsWriting() // Check that the unfinished catchpoint database record is deleted. - unfinishedCatchpoints, err := selectUnfinishedCatchpoints( - context.Background(), ml2.dbs.Rdb.Handle) + unfinishedCatchpoints, err := ct2.catchpointStore.SelectUnfinishedCatchpoints( + context.Background()) require.NoError(t, err) require.Empty(t, unfinishedCatchpoints) } @@ -1321,6 +1325,8 @@ func TestSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *testing.T) defer ml2.Close() ml.Close() + cps2 := store.NewCatchpointSQLReaderWriter(ml2.dbs.Wdb.Handle) + // Sanity check: first stage record should be deleted. _, exists, err := selectCatchpointFirstStageInfo( context.Background(), ml2.dbs.Rdb.Handle, firstStageRound) @@ -1328,8 +1334,8 @@ func TestSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *testing.T) require.False(t, exists) // Insert unfinished catchpoint record. - err = insertUnfinishedCatchpoint( - context.Background(), ml2.dbs.Wdb.Handle, secondStageRound, crypto.Digest{}) + err = cps2.InsertUnfinishedCatchpoint( + context.Background(), secondStageRound, crypto.Digest{}) require.NoError(t, err) // Create a catchpoint tracker and let it restart catchpoint's second stage. @@ -1337,8 +1343,8 @@ func TestSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *testing.T) defer ct2.close() // Check that the unfinished catchpoint database record is deleted. - unfinishedCatchpoints, err := selectUnfinishedCatchpoints( - context.Background(), ml2.dbs.Rdb.Handle) + unfinishedCatchpoints, err := ct2.catchpointStore.SelectUnfinishedCatchpoints( + context.Background()) require.NoError(t, err) require.Empty(t, unfinishedCatchpoints) } diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go index 5ca08bff73..71b8c67b73 100644 --- a/ledger/catchupaccessor.go +++ b/ledger/catchupaccessor.go @@ -32,6 +32,7 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/ledger/store" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" @@ -134,7 +135,8 @@ func (w *stagingWriterImpl) isShared() bool { // catchpointCatchupAccessorImpl is the concrete implementation of the CatchpointCatchupAccessor interface type catchpointCatchupAccessorImpl struct { - ledger *Ledger + ledger *Ledger + catchpointStore catchpointStore stagingWriter stagingWriter @@ -179,16 +181,17 @@ type CatchupAccessorClientLedger interface { // MakeCatchpointCatchupAccessor creates a CatchpointCatchupAccessor given a ledger func MakeCatchpointCatchupAccessor(ledger *Ledger, log logging.Logger) CatchpointCatchupAccessor { return &catchpointCatchupAccessorImpl{ - ledger: ledger, - stagingWriter: &stagingWriterImpl{wdb: ledger.trackerDB().Wdb}, - log: log, + ledger: ledger, + catchpointStore: store.NewCatchpointSQLReaderWriter(ledger.trackerDB().Wdb.Handle), + stagingWriter: &stagingWriterImpl{wdb: ledger.trackerDB().Wdb}, + log: log, } } // GetState returns the current state of the catchpoint catchup func (c *catchpointCatchupAccessorImpl) GetState(ctx context.Context) (state CatchpointCatchupState, err error) { var istate uint64 - istate, err = readCatchpointStateUint64(ctx, c.ledger.trackerDB().Rdb.Handle, catchpointStateCatchupState) + istate, err = c.catchpointStore.ReadCatchpointStateUint64(ctx, catchpointStateCatchupState) if err != nil { return 0, fmt.Errorf("unable to read catchpoint catchup state '%s': %v", catchpointStateCatchupState, err) } @@ -201,7 +204,7 @@ func (c *catchpointCatchupAccessorImpl) SetState(ctx context.Context, state Catc if state < CatchpointCatchupStateInactive || state > catchpointCatchupStateLast { return fmt.Errorf("invalid catchpoint catchup state provided : %d", state) } - err = writeCatchpointStateUint64(ctx, c.ledger.trackerDB().Wdb.Handle, catchpointStateCatchupState, uint64(state)) + err = c.catchpointStore.WriteCatchpointStateUint64(ctx, catchpointStateCatchupState, uint64(state)) if err != nil { return fmt.Errorf("unable to write catchpoint catchup state '%s': %v", catchpointStateCatchupState, err) } @@ -210,7 +213,7 @@ func (c *catchpointCatchupAccessorImpl) SetState(ctx context.Context, state Catc // GetLabel returns the current catchpoint catchup label func (c *catchpointCatchupAccessorImpl) GetLabel(ctx context.Context) (label string, err error) { - label, err = readCatchpointStateString(ctx, c.ledger.trackerDB().Rdb.Handle, catchpointStateCatchupLabel) + label, err = c.catchpointStore.ReadCatchpointStateString(ctx, catchpointStateCatchupLabel) if err != nil { return "", fmt.Errorf("unable to read catchpoint catchup state '%s': %v", catchpointStateCatchupLabel, err) } @@ -224,7 +227,7 @@ func (c *catchpointCatchupAccessorImpl) SetLabel(ctx context.Context, label stri if err != nil { return } - err = writeCatchpointStateString(ctx, c.ledger.trackerDB().Wdb.Handle, catchpointStateCatchupLabel, label) + err = c.catchpointStore.WriteCatchpointStateString(ctx, catchpointStateCatchupLabel, label) if err != nil { return fmt.Errorf("unable to write catchpoint catchup state '%s': %v", catchpointStateCatchupLabel, err) } @@ -240,26 +243,27 @@ func (c *catchpointCatchupAccessorImpl) ResetStagingBalances(ctx context.Context start := time.Now() ledgerResetstagingbalancesCount.Inc(nil) err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + cps := store.NewCatchpointSQLReaderWriter(tx) err = resetCatchpointStagingBalances(ctx, tx, newCatchup) if err != nil { return fmt.Errorf("unable to reset catchpoint catchup balances : %v", err) } if !newCatchup { - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupBalancesRound, 0) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupBalancesRound, 0) if err != nil { return err } - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupBlockRound, 0) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupBlockRound, 0) if err != nil { return err } - err = writeCatchpointStateString(ctx, tx, catchpointStateCatchupLabel, "") + err = cps.WriteCatchpointStateString(ctx, catchpointStateCatchupLabel, "") if err != nil { return err } - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupState, 0) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupState, 0) if err != nil { return fmt.Errorf("unable to write catchpoint catchup state '%s': %v", catchpointStateCatchupState, err) } @@ -329,12 +333,13 @@ func (c *catchpointCatchupAccessorImpl) processStagingContent(ctx context.Contex start := time.Now() ledgerProcessstagingcontentCount.Inc(nil) err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupBlockRound, uint64(fileHeader.BlocksRound)) + cps := store.NewCatchpointSQLReaderWriter(tx) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupBlockRound, uint64(fileHeader.BlocksRound)) if err != nil { return fmt.Errorf("CatchpointCatchupAccessorImpl::processStagingContent: unable to write catchpoint catchup state '%s': %v", catchpointStateCatchupBlockRound, err) } if fileHeader.Version == CatchpointFileVersionV6 { - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupHashRound, uint64(fileHeader.BlocksRound)) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupHashRound, uint64(fileHeader.BlocksRound)) if err != nil { return fmt.Errorf("CatchpointCatchupAccessorImpl::processStagingContent: unable to write catchpoint catchup state '%s': %v", catchpointStateCatchupHashRound, err) } @@ -794,7 +799,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro // GetCatchupBlockRound returns the latest block round matching the current catchpoint func (c *catchpointCatchupAccessorImpl) GetCatchupBlockRound(ctx context.Context) (round basics.Round, err error) { var iRound uint64 - iRound, err = readCatchpointStateUint64(ctx, c.ledger.trackerDB().Rdb.Handle, catchpointStateCatchupBlockRound) + iRound, err = c.catchpointStore.ReadCatchpointStateUint64(ctx, catchpointStateCatchupBlockRound) if err != nil { return 0, fmt.Errorf("unable to read catchpoint catchup state '%s': %v", catchpointStateCatchupBlockRound, err) } @@ -809,13 +814,13 @@ func (c *catchpointCatchupAccessorImpl) VerifyCatchpoint(ctx context.Context, bl var totals ledgercore.AccountTotals var catchpointLabel string - catchpointLabel, err = readCatchpointStateString(ctx, rdb.Handle, catchpointStateCatchupLabel) + catchpointLabel, err = c.catchpointStore.ReadCatchpointStateString(ctx, catchpointStateCatchupLabel) if err != nil { return fmt.Errorf("unable to read catchpoint catchup state '%s': %v", catchpointStateCatchupLabel, err) } var iRound uint64 - iRound, err = readCatchpointStateUint64(ctx, rdb.Handle, catchpointStateCatchupBlockRound) + iRound, err = c.catchpointStore.ReadCatchpointStateUint64(ctx, catchpointStateCatchupBlockRound) if err != nil { return fmt.Errorf("unable to read catchpoint catchup state '%s': %v", catchpointStateCatchupBlockRound, err) } @@ -876,7 +881,8 @@ func (c *catchpointCatchupAccessorImpl) StoreBalancesRound(ctx context.Context, start := time.Now() ledgerStorebalancesroundCount.Inc(nil) err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupBalancesRound, uint64(balancesRound)) + cps := store.NewCatchpointSQLReaderWriter(tx) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupBalancesRound, uint64(balancesRound)) if err != nil { return fmt.Errorf("CatchpointCatchupAccessorImpl::StoreBalancesRound: unable to write catchpoint catchup state '%s': %v", catchpointStateCatchupBalancesRound, err) } @@ -972,15 +978,17 @@ func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err start := time.Now() ledgerCatchpointFinishBalsCount.Inc(nil) err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + cps := store.NewCatchpointSQLReaderWriter(tx) + var balancesRound, hashRound uint64 var totals ledgercore.AccountTotals - balancesRound, err = readCatchpointStateUint64(ctx, tx, catchpointStateCatchupBalancesRound) + balancesRound, err = cps.ReadCatchpointStateUint64(ctx, catchpointStateCatchupBalancesRound) if err != nil { return err } - hashRound, err = readCatchpointStateUint64(ctx, tx, catchpointStateCatchupHashRound) + hashRound, err = cps.ReadCatchpointStateUint64(ctx, catchpointStateCatchupHashRound) if err != nil { return err } @@ -1038,29 +1046,29 @@ func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err return err } - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupBalancesRound, 0) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupBalancesRound, 0) if err != nil { return err } - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupBlockRound, 0) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupBlockRound, 0) if err != nil { return err } - err = writeCatchpointStateString(ctx, tx, catchpointStateCatchupLabel, "") + err = cps.WriteCatchpointStateString(ctx, catchpointStateCatchupLabel, "") if err != nil { return err } if hashRound != 0 { - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupHashRound, 0) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupHashRound, 0) if err != nil { return err } } - err = writeCatchpointStateUint64(ctx, tx, catchpointStateCatchupState, 0) + err = cps.WriteCatchpointStateUint64(ctx, catchpointStateCatchupState, 0) if err != nil { return fmt.Errorf("unable to write catchpoint catchup state '%s': %v", catchpointStateCatchupState, err) } diff --git a/ledger/msgp_gen.go b/ledger/msgp_gen.go index d9e9cdbc57..5ad3f23d9a 100644 --- a/ledger/msgp_gen.go +++ b/ledger/msgp_gen.go @@ -52,14 +52,6 @@ import ( // |-----> (*) Msgsize // |-----> (*) MsgIsZero // -// catchpointState -// |-----> MarshalMsg -// |-----> CanMarshalMsg -// |-----> (*) UnmarshalMsg -// |-----> (*) CanUnmarshalMsg -// |-----> Msgsize -// |-----> MsgIsZero -// // encodedBalanceRecordV5 // |-----> (*) MarshalMsg // |-----> (*) CanMarshalMsg @@ -1438,52 +1430,6 @@ func (z *catchpointFirstStageInfo) MsgIsZero() bool { return ((*z).Totals.MsgIsZero()) && ((*z).TrieBalancesHash.MsgIsZero()) && ((*z).TotalAccounts == 0) && ((*z).TotalKVs == 0) && ((*z).TotalChunks == 0) && ((*z).BiggestChunkLen == 0) } -// MarshalMsg implements msgp.Marshaler -func (z catchpointState) MarshalMsg(b []byte) (o []byte) { - o = msgp.Require(b, z.Msgsize()) - o = msgp.AppendString(o, string(z)) - return -} - -func (_ catchpointState) CanMarshalMsg(z interface{}) bool { - _, ok := (z).(catchpointState) - if !ok { - _, ok = (z).(*catchpointState) - } - return ok -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *catchpointState) UnmarshalMsg(bts []byte) (o []byte, err error) { - { - var zb0001 string - zb0001, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - (*z) = catchpointState(zb0001) - } - o = bts - return -} - -func (_ *catchpointState) CanUnmarshalMsg(z interface{}) bool { - _, ok := (z).(*catchpointState) - return ok -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z catchpointState) Msgsize() (s int) { - s = msgp.StringPrefixSize + len(string(z)) - return -} - -// MsgIsZero returns whether this is a zero value -func (z catchpointState) MsgIsZero() bool { - return z == "" -} - // MarshalMsg implements msgp.Marshaler func (z *encodedBalanceRecordV5) MarshalMsg(b []byte) (o []byte) { o = msgp.Require(b, z.Msgsize()) diff --git a/ledger/store/catchpoint.go b/ledger/store/catchpoint.go new file mode 100644 index 0000000000..f45af7c279 --- /dev/null +++ b/ledger/store/catchpoint.go @@ -0,0 +1,228 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package store + +import ( + "context" + "database/sql" + + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/util/db" +) + +// CatchpointState is used to store catchpoint related variables into the catchpointstate table. +type CatchpointState string + +// UnfinishedCatchpointRecord represents a stored record of an unfinished catchpoint. +type UnfinishedCatchpointRecord struct { + Round basics.Round + BlockHash crypto.Digest +} + +type catchpointReader struct { + q db.Queryable +} + +type catchpointWriter struct { + e db.Executable +} + +type catchpointReaderWriter struct { + catchpointReader + catchpointWriter +} + +// NewCatchpointSQLReaderWriter creates a Catchpoint SQL reader+writer +func NewCatchpointSQLReaderWriter(e db.Executable) *catchpointReaderWriter { + return &catchpointReaderWriter{ + catchpointReader{q: e}, + catchpointWriter{e: e}, + } +} + +func (cr *catchpointReader) GetCatchpoint(ctx context.Context, round basics.Round) (fileName string, catchpoint string, fileSize int64, err error) { + err = cr.q.QueryRowContext(ctx, "SELECT filename, catchpoint, filesize FROM storedcatchpoints WHERE round=?", int64(round)).Scan(&fileName, &catchpoint, &fileSize) + return +} + +func (cr *catchpointReader) GetOldestCatchpointFiles(ctx context.Context, fileCount int, filesToKeep int) (fileNames map[basics.Round]string, err error) { + err = db.Retry(func() (err error) { + query := "SELECT round, filename FROM storedcatchpoints WHERE pinned = 0 and round <= COALESCE((SELECT round FROM storedcatchpoints WHERE pinned = 0 ORDER BY round DESC LIMIT ?, 1),0) ORDER BY round ASC LIMIT ?" + rows, err := cr.q.QueryContext(ctx, query, filesToKeep, fileCount) + if err != nil { + return err + } + defer rows.Close() + + fileNames = make(map[basics.Round]string) + for rows.Next() { + var fileName string + var round basics.Round + err = rows.Scan(&round, &fileName) + if err != nil { + return err + } + fileNames[round] = fileName + } + + return rows.Err() + }) + if err != nil { + fileNames = nil + } + return +} + +func (cr *catchpointReader) ReadCatchpointStateUint64(ctx context.Context, stateName CatchpointState) (val uint64, err error) { + err = db.Retry(func() (err error) { + query := "SELECT intval FROM catchpointstate WHERE id=?" + var v sql.NullInt64 + err = cr.q.QueryRowContext(ctx, query, stateName).Scan(&v) + if err == sql.ErrNoRows { + return nil + } + if err != nil { + return err + } + if v.Valid { + val = uint64(v.Int64) + } + return nil + }) + return val, err +} + +func (cr *catchpointReader) ReadCatchpointStateString(ctx context.Context, stateName CatchpointState) (val string, err error) { + err = db.Retry(func() (err error) { + query := "SELECT strval FROM catchpointstate WHERE id=?" + var v sql.NullString + err = cr.q.QueryRowContext(ctx, query, stateName).Scan(&v) + if err == sql.ErrNoRows { + return nil + } + if err != nil { + return err + } + + if v.Valid { + val = v.String + } + return nil + }) + return val, err +} + +func (cr *catchpointReader) SelectUnfinishedCatchpoints(ctx context.Context) ([]UnfinishedCatchpointRecord, error) { + var res []UnfinishedCatchpointRecord + + f := func() error { + query := "SELECT round, blockhash FROM unfinishedcatchpoints ORDER BY round" + rows, err := cr.q.QueryContext(ctx, query) + if err != nil { + return err + } + + // Clear `res` in case this function is repeated. + res = res[:0] + for rows.Next() { + var record UnfinishedCatchpointRecord + var blockHash []byte + err = rows.Scan(&record.Round, &blockHash) + if err != nil { + return err + } + copy(record.BlockHash[:], blockHash) + res = append(res, record) + } + + return nil + } + err := db.Retry(f) + if err != nil { + return nil, err + } + + return res, nil +} + +func (cw *catchpointWriter) StoreCatchpoint(ctx context.Context, round basics.Round, fileName string, catchpoint string, fileSize int64) (err error) { + err = db.Retry(func() (err error) { + query := "DELETE FROM storedcatchpoints WHERE round=?" + _, err = cw.e.ExecContext(ctx, query, round) + if err != nil || (fileName == "" && catchpoint == "" && fileSize == 0) { + return err + } + + query = "INSERT INTO storedcatchpoints(round, filename, catchpoint, filesize, pinned) VALUES(?, ?, ?, ?, 0)" + _, err = cw.e.ExecContext(ctx, query, round, fileName, catchpoint, fileSize) + return err + }) + return +} + +func (cw *catchpointWriter) WriteCatchpointStateUint64(ctx context.Context, stateName CatchpointState, setValue uint64) (err error) { + err = db.Retry(func() (err error) { + if setValue == 0 { + return deleteCatchpointStateImpl(ctx, cw.e, stateName) + } + + // we don't know if there is an entry in the table for this state, so we'll insert/replace it just in case. + query := "INSERT OR REPLACE INTO catchpointstate(id, intval) VALUES(?, ?)" + _, err = cw.e.ExecContext(ctx, query, stateName, setValue) + return err + }) + return err +} + +func (cw *catchpointWriter) WriteCatchpointStateString(ctx context.Context, stateName CatchpointState, setValue string) (err error) { + err = db.Retry(func() (err error) { + if setValue == "" { + return deleteCatchpointStateImpl(ctx, cw.e, stateName) + } + + // we don't know if there is an entry in the table for this state, so we'll insert/replace it just in case. + query := "INSERT OR REPLACE INTO catchpointstate(id, strval) VALUES(?, ?)" + _, err = cw.e.ExecContext(ctx, query, stateName, setValue) + return err + }) + return err +} + +func (cw *catchpointWriter) InsertUnfinishedCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest) error { + f := func() error { + query := "INSERT INTO unfinishedcatchpoints(round, blockhash) VALUES(?, ?)" + _, err := cw.e.ExecContext(ctx, query, round, blockHash[:]) + return err + } + return db.Retry(f) +} + +func (cw *catchpointWriter) DeleteUnfinishedCatchpoint(ctx context.Context, round basics.Round) error { + f := func() error { + query := "DELETE FROM unfinishedcatchpoints WHERE round = ?" + _, err := cw.e.ExecContext(ctx, query, round) + return err + } + return db.Retry(f) +} + +func deleteCatchpointStateImpl(ctx context.Context, e db.Executable, stateName CatchpointState) error { + query := "DELETE FROM catchpointstate WHERE id=?" + _, err := e.ExecContext(ctx, query, stateName) + return err +} diff --git a/ledger/store/interface.go b/ledger/store/interface.go index 835330042d..845af1a6d9 100644 --- a/ledger/store/interface.go +++ b/ledger/store/interface.go @@ -16,7 +16,12 @@ package store -import "github.com/algorand/go-algorand/data/basics" +import ( + "context" + + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" +) // AccountsWriter is the write interface for: // - accounts, resources, app kvs, creatables @@ -73,3 +78,27 @@ type OnlineAccountsReader interface { Close() } + +// CatchpointWriter is the write interface for: +// - catchpoints +type CatchpointWriter interface { + StoreCatchpoint(ctx context.Context, round basics.Round, fileName string, catchpoint string, fileSize int64) (err error) + + WriteCatchpointStateUint64(ctx context.Context, stateName CatchpointState, setValue uint64) (err error) + WriteCatchpointStateString(ctx context.Context, stateName CatchpointState, setValue string) (err error) + + InsertUnfinishedCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest) error + DeleteUnfinishedCatchpoint(ctx context.Context, round basics.Round) error +} + +// CatchpointReader is the read interface for: +// - catchpoints +type CatchpointReader interface { + GetCatchpoint(ctx context.Context, round basics.Round) (fileName string, catchpoint string, fileSize int64, err error) + GetOldestCatchpointFiles(ctx context.Context, fileCount int, filesToKeep int) (fileNames map[basics.Round]string, err error) + + ReadCatchpointStateUint64(ctx context.Context, stateName CatchpointState) (val uint64, err error) + ReadCatchpointStateString(ctx context.Context, stateName CatchpointState) (val string, err error) + + SelectUnfinishedCatchpoints(ctx context.Context) ([]UnfinishedCatchpointRecord, error) +} diff --git a/ledger/store/msgp_gen.go b/ledger/store/msgp_gen.go index 0fcd1fb9d2..fb647e62e7 100644 --- a/ledger/store/msgp_gen.go +++ b/ledger/store/msgp_gen.go @@ -33,6 +33,14 @@ import ( // |-----> (*) Msgsize // |-----> (*) MsgIsZero // +// CatchpointState +// |-----> MarshalMsg +// |-----> CanMarshalMsg +// |-----> (*) UnmarshalMsg +// |-----> (*) CanUnmarshalMsg +// |-----> Msgsize +// |-----> MsgIsZero +// // ResourceFlags // |-----> MarshalMsg // |-----> CanMarshalMsg @@ -1104,6 +1112,52 @@ func (z *BaseVotingData) MsgIsZero() bool { return ((*z).VoteID.MsgIsZero()) && ((*z).SelectionID.MsgIsZero()) && ((*z).VoteFirstValid.MsgIsZero()) && ((*z).VoteLastValid.MsgIsZero()) && ((*z).VoteKeyDilution == 0) && ((*z).StateProofID.MsgIsZero()) } +// MarshalMsg implements msgp.Marshaler +func (z CatchpointState) MarshalMsg(b []byte) (o []byte) { + o = msgp.Require(b, z.Msgsize()) + o = msgp.AppendString(o, string(z)) + return +} + +func (_ CatchpointState) CanMarshalMsg(z interface{}) bool { + _, ok := (z).(CatchpointState) + if !ok { + _, ok = (z).(*CatchpointState) + } + return ok +} + +// UnmarshalMsg implements msgp.Unmarshaler +func (z *CatchpointState) UnmarshalMsg(bts []byte) (o []byte, err error) { + { + var zb0001 string + zb0001, bts, err = msgp.ReadStringBytes(bts) + if err != nil { + err = msgp.WrapError(err) + return + } + (*z) = CatchpointState(zb0001) + } + o = bts + return +} + +func (_ *CatchpointState) CanUnmarshalMsg(z interface{}) bool { + _, ok := (z).(*CatchpointState) + return ok +} + +// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message +func (z CatchpointState) Msgsize() (s int) { + s = msgp.StringPrefixSize + len(string(z)) + return +} + +// MsgIsZero returns whether this is a zero value +func (z CatchpointState) MsgIsZero() bool { + return z == "" +} + // MarshalMsg implements msgp.Marshaler func (z ResourceFlags) MarshalMsg(b []byte) (o []byte) { o = msgp.Require(b, z.Msgsize()) diff --git a/ledger/trackerdb.go b/ledger/trackerdb.go index 3b1f6ca017..bb9bf9a8ce 100644 --- a/ledger/trackerdb.go +++ b/ledger/trackerdb.go @@ -28,9 +28,9 @@ import ( "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" - "github.com/algorand/go-algorand/crypto/merkletrie" "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/ledger/store" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" @@ -243,7 +243,6 @@ func (tu trackerDBSchemaInitializer) version() int32 { // The accountbase would get initialized with the au.initAccounts // The accounttotals would get initialized to align with the initialization account added to accountbase // The acctrounds would get updated to indicate that the balance matches round 0 -// func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema0(ctx context.Context, tx *sql.Tx) (err error) { tu.log.Infof("upgradeDatabaseSchema0 initializing schema") tu.newDatabase, err = accountsInit(tx, tu.initAccounts, config.Consensus[tu.initProto]) @@ -268,7 +267,6 @@ func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema0(ctx context.Context // // This upgrade doesn't change any of the actual database schema ( i.e. tables, indexes ) but rather just performing // a functional update to it's content. -// func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema1(ctx context.Context, tx *sql.Tx) (err error) { var modifiedAccounts uint if tu.newDatabase { @@ -283,6 +281,8 @@ func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema1(ctx context.Context } if modifiedAccounts > 0 { + cts := store.NewCatchpointSQLReaderWriter(tx) + tu.log.Infof("upgradeDatabaseSchema1 reencoded %d accounts", modifiedAccounts) tu.log.Infof("upgradeDatabaseSchema1 resetting account hashes") @@ -295,7 +295,7 @@ func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema1(ctx context.Context tu.log.Infof("upgradeDatabaseSchema1 preparing queries") tu.log.Infof("upgradeDatabaseSchema1 resetting prior catchpoints") // delete the last catchpoint label if we have any. - err = writeCatchpointStateString(ctx, tx, catchpointStateLastCatchpoint, "") + err = cts.WriteCatchpointStateString(ctx, catchpointStateLastCatchpoint, "") if err != nil { return fmt.Errorf("upgradeDatabaseSchema1 unable to clear prior catchpoint : %v", err) } @@ -319,7 +319,6 @@ schemaUpdateComplete: // This upgrade only enables the database vacuuming which will take place once the upgrade process is complete. // If the user has already specified the OptimizeAccountsDatabaseOnStartup flag in the configuration file, this // step becomes a no-op. -// func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema2(ctx context.Context, tx *sql.Tx) (err error) { if !tu.newDatabase { tu.vacuumOnStartup = true @@ -437,8 +436,9 @@ func (tu *trackerDBSchemaInitializer) upgradeDatabaseSchema5(ctx context.Context } func (tu *trackerDBSchemaInitializer) deleteUnfinishedCatchpoint(ctx context.Context, tx *sql.Tx) error { + cts := store.NewCatchpointSQLReaderWriter(tx) // Delete an unfinished catchpoint if there is one. - round, err := readCatchpointStateUint64(ctx, tx, catchpointStateWritingCatchpoint) + round, err := cts.ReadCatchpointStateUint64(ctx, catchpointStateWritingCatchpoint) if err != nil { return err } @@ -454,7 +454,7 @@ func (tu *trackerDBSchemaInitializer) deleteUnfinishedCatchpoint(ctx context.Con return err } - return writeCatchpointStateUint64(ctx, tx, catchpointStateWritingCatchpoint, 0) + return cts.WriteCatchpointStateUint64(ctx, catchpointStateWritingCatchpoint, 0) } // upgradeDatabaseSchema6 upgrades the database schema from version 6 to version 7, From acb7323000230d6d85c81ee7ba91e82e41208c2a Mon Sep 17 00:00:00 2001 From: Ignacio Corderi Date: Tue, 22 Nov 2022 11:53:50 -0300 Subject: [PATCH 2/3] fix --- ledger/catchpointtracker.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index e3fae8eaa2..d07285677c 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -1244,8 +1244,9 @@ func makeCatchpointFilePath(round basics.Round) string { // deleting 2 entries while inserting single entry allow us to adjust the size of the backing storage and have the // database and storage realign. func (ct *catchpointTracker) recordCatchpointFile(ctx context.Context, e db.Executable, round basics.Round, relCatchpointFilePath string, fileSize int64) (err error) { + cps := store.NewCatchpointSQLReaderWriter(e) if ct.catchpointFileHistoryLength != 0 { - err = ct.catchpointStore.StoreCatchpoint(ctx, round, relCatchpointFilePath, "", fileSize) + err = cps.StoreCatchpoint(ctx, round, relCatchpointFilePath, "", fileSize) if err != nil { ct.log.Warnf("catchpointTracker.recordCatchpointFile() unable to save catchpoint: %v", err) return @@ -1260,9 +1261,6 @@ func (ct *catchpointTracker) recordCatchpointFile(ctx context.Context, e db.Exec if ct.catchpointFileHistoryLength == -1 { return } - - cps := store.NewCatchpointSQLReaderWriter(e) - var filesToDelete map[basics.Round]string filesToDelete, err = cps.GetOldestCatchpointFiles(ctx, 2, ct.catchpointFileHistoryLength) if err != nil { @@ -1290,7 +1288,8 @@ func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseS // TODO: we need to generalize this, check @cce PoC PR, he has something // somewhat broken for some KVs.. err := ct.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - dbFileName, _, fileSize, err = ct.catchpointStore.GetCatchpoint(ctx, round) + cps := store.NewCatchpointSQLReaderWriter(tx) + dbFileName, _, fileSize, err = cps.GetCatchpoint(ctx, round) return }) ledgerGetcatchpointMicros.AddMicrosecondsSince(start, nil) From dc45020bcf40e2ad6f0fc1a8436213696da1a010 Mon Sep 17 00:00:00 2001 From: Ignacio Corderi Date: Tue, 22 Nov 2022 16:51:10 -0300 Subject: [PATCH 3/3] ignore `CatchpointState` from msgp --- ledger/store/catchpoint.go | 2 ++ ledger/store/msgp_gen.go | 54 -------------------------------------- 2 files changed, 2 insertions(+), 54 deletions(-) diff --git a/ledger/store/catchpoint.go b/ledger/store/catchpoint.go index f45af7c279..19d12f64a6 100644 --- a/ledger/store/catchpoint.go +++ b/ledger/store/catchpoint.go @@ -26,6 +26,8 @@ import ( ) // CatchpointState is used to store catchpoint related variables into the catchpointstate table. +// +//msgp:ignore CatchpointState type CatchpointState string // UnfinishedCatchpointRecord represents a stored record of an unfinished catchpoint. diff --git a/ledger/store/msgp_gen.go b/ledger/store/msgp_gen.go index fb647e62e7..0fcd1fb9d2 100644 --- a/ledger/store/msgp_gen.go +++ b/ledger/store/msgp_gen.go @@ -33,14 +33,6 @@ import ( // |-----> (*) Msgsize // |-----> (*) MsgIsZero // -// CatchpointState -// |-----> MarshalMsg -// |-----> CanMarshalMsg -// |-----> (*) UnmarshalMsg -// |-----> (*) CanUnmarshalMsg -// |-----> Msgsize -// |-----> MsgIsZero -// // ResourceFlags // |-----> MarshalMsg // |-----> CanMarshalMsg @@ -1112,52 +1104,6 @@ func (z *BaseVotingData) MsgIsZero() bool { return ((*z).VoteID.MsgIsZero()) && ((*z).SelectionID.MsgIsZero()) && ((*z).VoteFirstValid.MsgIsZero()) && ((*z).VoteLastValid.MsgIsZero()) && ((*z).VoteKeyDilution == 0) && ((*z).StateProofID.MsgIsZero()) } -// MarshalMsg implements msgp.Marshaler -func (z CatchpointState) MarshalMsg(b []byte) (o []byte) { - o = msgp.Require(b, z.Msgsize()) - o = msgp.AppendString(o, string(z)) - return -} - -func (_ CatchpointState) CanMarshalMsg(z interface{}) bool { - _, ok := (z).(CatchpointState) - if !ok { - _, ok = (z).(*CatchpointState) - } - return ok -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *CatchpointState) UnmarshalMsg(bts []byte) (o []byte, err error) { - { - var zb0001 string - zb0001, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - (*z) = CatchpointState(zb0001) - } - o = bts - return -} - -func (_ *CatchpointState) CanUnmarshalMsg(z interface{}) bool { - _, ok := (z).(*CatchpointState) - return ok -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z CatchpointState) Msgsize() (s int) { - s = msgp.StringPrefixSize + len(string(z)) - return -} - -// MsgIsZero returns whether this is a zero value -func (z CatchpointState) MsgIsZero() bool { - return z == "" -} - // MarshalMsg implements msgp.Marshaler func (z ResourceFlags) MarshalMsg(b []byte) (o []byte) { o = msgp.Require(b, z.Msgsize())