Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 9 additions & 189 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,35 +239,32 @@ type compactOnlineAccountDeltas struct {
misses []int
}

// catchpointState is used to store catchpoint related variables into the catchpointstate table.
type catchpointState string

const (
// catchpointStateLastCatchpoint is written by a node once a catchpoint label is created for a round
catchpointStateLastCatchpoint = catchpointState("lastCatchpoint")
catchpointStateLastCatchpoint = store.CatchpointState("lastCatchpoint")
// This state variable is set to 1 if catchpoint's first stage is unfinished,
// and is 0 otherwise. Used to clear / restart the first stage after a crash.
// This key is set in the same db transaction as the account updates, so the
// unfinished first stage corresponds to the current db round.
catchpointStateWritingFirstStageInfo = catchpointState("writingFirstStageInfo")
catchpointStateWritingFirstStageInfo = store.CatchpointState("writingFirstStageInfo")
// If there is an unfinished catchpoint, this state variable is set to
// the catchpoint's round. Otherwise, it is set to 0.
// DEPRECATED.
catchpointStateWritingCatchpoint = catchpointState("writingCatchpoint")
catchpointStateWritingCatchpoint = store.CatchpointState("writingCatchpoint")
// catchpointCatchupState is the state of the catchup process. The variable is stored only during the catchpoint catchup process, and removed afterward.
catchpointStateCatchupState = catchpointState("catchpointCatchupState")
catchpointStateCatchupState = store.CatchpointState("catchpointCatchupState")
// catchpointStateCatchupLabel is the label to which the currently catchpoint catchup process is trying to catchup to.
catchpointStateCatchupLabel = catchpointState("catchpointCatchupLabel")
catchpointStateCatchupLabel = store.CatchpointState("catchpointCatchupLabel")
// catchpointCatchupBlockRound is the block round that is associated with the current running catchpoint catchup.
catchpointStateCatchupBlockRound = catchpointState("catchpointCatchupBlockRound")
catchpointStateCatchupBlockRound = store.CatchpointState("catchpointCatchupBlockRound")
// catchpointStateCatchupBalancesRound is the balance round that is associated with the current running catchpoint catchup. Typically it would be
// equal to catchpointStateCatchupBlockRound - 320.
catchpointStateCatchupBalancesRound = catchpointState("catchpointCatchupBalancesRound")
catchpointStateCatchupBalancesRound = store.CatchpointState("catchpointCatchupBalancesRound")
// catchpointStateCatchupHashRound is the round that is associated with the hash of the merkle trie. Normally, it's identical to catchpointStateCatchupBalancesRound,
// however, it could differ when we catchup from a catchpoint that was created using a different version : in this case,
// we set it to zero in order to reset the merkle trie. This would force the merkle trie to be re-build on startup ( if needed ).
catchpointStateCatchupHashRound = catchpointState("catchpointCatchupHashRound")
catchpointStateCatchpointLookback = catchpointState("catchpointLookback")
catchpointStateCatchupHashRound = store.CatchpointState("catchpointCatchupHashRound")
catchpointStateCatchpointLookback = store.CatchpointState("catchpointLookback")
)

// MaxEncodedBaseAccountDataSize is a rough estimate for the worst-case scenario we're going to have of the base account data serialized.
Expand Down Expand Up @@ -1145,11 +1142,6 @@ func applyCatchpointStagingBalances(ctx context.Context, tx *sql.Tx, balancesRou
return
}

func getCatchpoint(ctx context.Context, q db.Queryable, round basics.Round) (fileName string, catchpoint string, fileSize int64, err error) {
err = q.QueryRowContext(ctx, "SELECT filename, catchpoint, filesize FROM storedcatchpoints WHERE round=?", int64(round)).Scan(&fileName, &catchpoint, &fileSize)
return
}

// accountsInit fills the database using tx with initAccounts if the
// database has not been initialized yet.
//
Expand Down Expand Up @@ -1916,122 +1908,6 @@ func accountsHashRound(ctx context.Context, tx *sql.Tx) (hashrnd basics.Round, e
return
}

func storeCatchpoint(ctx context.Context, e db.Executable, round basics.Round, fileName string, catchpoint string, fileSize int64) (err error) {
err = db.Retry(func() (err error) {
query := "DELETE FROM storedcatchpoints WHERE round=?"
_, err = e.ExecContext(ctx, query, round)
if err != nil || (fileName == "" && catchpoint == "" && fileSize == 0) {
return err
}

query = "INSERT INTO storedcatchpoints(round, filename, catchpoint, filesize, pinned) VALUES(?, ?, ?, ?, 0)"
_, err = e.ExecContext(ctx, query, round, fileName, catchpoint, fileSize)
return err
})
return
}

func getOldestCatchpointFiles(ctx context.Context, q db.Queryable, fileCount int, filesToKeep int) (fileNames map[basics.Round]string, err error) {
err = db.Retry(func() (err error) {
query := "SELECT round, filename FROM storedcatchpoints WHERE pinned = 0 and round <= COALESCE((SELECT round FROM storedcatchpoints WHERE pinned = 0 ORDER BY round DESC LIMIT ?, 1),0) ORDER BY round ASC LIMIT ?"
rows, err := q.QueryContext(ctx, query, filesToKeep, fileCount)
if err != nil {
return err
}
defer rows.Close()

fileNames = make(map[basics.Round]string)
for rows.Next() {
var fileName string
var round basics.Round
err = rows.Scan(&round, &fileName)
if err != nil {
return err
}
fileNames[round] = fileName
}

return rows.Err()
})
if err != nil {
fileNames = nil
}
return
}

func readCatchpointStateUint64(ctx context.Context, q db.Queryable, stateName catchpointState) (val uint64, err error) {
err = db.Retry(func() (err error) {
query := "SELECT intval FROM catchpointstate WHERE id=?"
var v sql.NullInt64
err = q.QueryRowContext(ctx, query, stateName).Scan(&v)
if err == sql.ErrNoRows {
return nil
}
if err != nil {
return err
}
if v.Valid {
val = uint64(v.Int64)
}
return nil
})
return val, err
}

func writeCatchpointStateUint64(ctx context.Context, e db.Executable, stateName catchpointState, setValue uint64) (err error) {
err = db.Retry(func() (err error) {
if setValue == 0 {
return deleteCatchpointStateImpl(ctx, e, stateName)
}

// we don't know if there is an entry in the table for this state, so we'll insert/replace it just in case.
query := "INSERT OR REPLACE INTO catchpointstate(id, intval) VALUES(?, ?)"
_, err = e.ExecContext(ctx, query, stateName, setValue)
return err
})
return err
}

func readCatchpointStateString(ctx context.Context, q db.Queryable, stateName catchpointState) (val string, err error) {
err = db.Retry(func() (err error) {
query := "SELECT strval FROM catchpointstate WHERE id=?"
var v sql.NullString
err = q.QueryRowContext(ctx, query, stateName).Scan(&v)
if err == sql.ErrNoRows {
return nil
}
if err != nil {
return err
}

if v.Valid {
val = v.String
}
return nil
})
return val, err
}

func writeCatchpointStateString(ctx context.Context, e db.Executable, stateName catchpointState, setValue string) (err error) {
err = db.Retry(func() (err error) {
if setValue == "" {
return deleteCatchpointStateImpl(ctx, e, stateName)
}

// we don't know if there is an entry in the table for this state, so we'll insert/replace it just in case.
query := "INSERT OR REPLACE INTO catchpointstate(id, strval) VALUES(?, ?)"
_, err = e.ExecContext(ctx, query, stateName, setValue)
return err
})
return err
}

func deleteCatchpointStateImpl(ctx context.Context, e db.Executable, stateName catchpointState) error {
query := "DELETE FROM catchpointstate WHERE id=?"
_, err := e.ExecContext(ctx, query, stateName)
return err
}

// accountsOnlineTop returns the top n online accounts starting at position offset
// (that is, the top offset'th account through the top offset+n-1'th account).
//
Expand Down Expand Up @@ -3859,59 +3735,3 @@ func deleteOldCatchpointFirstStageInfo(ctx context.Context, e db.Executable, max
}
return db.Retry(f)
}

func insertUnfinishedCatchpoint(ctx context.Context, e db.Executable, round basics.Round, blockHash crypto.Digest) error {
f := func() error {
query := "INSERT INTO unfinishedcatchpoints(round, blockhash) VALUES(?, ?)"
_, err := e.ExecContext(ctx, query, round, blockHash[:])
return err
}
return db.Retry(f)
}

type unfinishedCatchpointRecord struct {
round basics.Round
blockHash crypto.Digest
}

func selectUnfinishedCatchpoints(ctx context.Context, q db.Queryable) ([]unfinishedCatchpointRecord, error) {
var res []unfinishedCatchpointRecord

f := func() error {
query := "SELECT round, blockhash FROM unfinishedcatchpoints ORDER BY round"
rows, err := q.QueryContext(ctx, query)
if err != nil {
return err
}

// Clear `res` in case this function is repeated.
res = res[:0]
for rows.Next() {
var record unfinishedCatchpointRecord
var blockHash []byte
err = rows.Scan(&record.round, &blockHash)
if err != nil {
return err
}
copy(record.blockHash[:], blockHash)
res = append(res, record)
}

return nil
}
err := db.Retry(f)
if err != nil {
return nil, err
}

return res, nil
}

func deleteUnfinishedCatchpoint(ctx context.Context, e db.Executable, round basics.Round) error {
f := func() error {
query := "DELETE FROM unfinishedcatchpoints WHERE round = ?"
_, err := e.ExecContext(ctx, query, round)
return err
}
return db.Retry(f)
}
28 changes: 15 additions & 13 deletions ledger/accountdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3173,43 +3173,45 @@ func TestUnfinishedCatchpointsTable(t *testing.T) {
dbs, _ := dbOpenTest(t, true)
defer dbs.Close()

cts := store.NewCatchpointSQLReaderWriter(dbs.Wdb.Handle)

err := accountsCreateUnfinishedCatchpointsTable(
context.Background(), dbs.Wdb.Handle)
require.NoError(t, err)

var d3 crypto.Digest
rand.Read(d3[:])
err = insertUnfinishedCatchpoint(context.Background(), dbs.Wdb.Handle, 3, d3)
err = cts.InsertUnfinishedCatchpoint(context.Background(), 3, d3)
require.NoError(t, err)

var d5 crypto.Digest
rand.Read(d5[:])
err = insertUnfinishedCatchpoint(context.Background(), dbs.Wdb.Handle, 5, d5)
err = cts.InsertUnfinishedCatchpoint(context.Background(), 5, d5)
require.NoError(t, err)

ret, err := selectUnfinishedCatchpoints(context.Background(), dbs.Rdb.Handle)
ret, err := cts.SelectUnfinishedCatchpoints(context.Background())
require.NoError(t, err)
expected := []unfinishedCatchpointRecord{
expected := []store.UnfinishedCatchpointRecord{
{
round: 3,
blockHash: d3,
Round: 3,
BlockHash: d3,
},
{
round: 5,
blockHash: d5,
Round: 5,
BlockHash: d5,
},
}
require.Equal(t, expected, ret)

err = deleteUnfinishedCatchpoint(context.Background(), dbs.Wdb.Handle, 3)
err = cts.DeleteUnfinishedCatchpoint(context.Background(), 3)
require.NoError(t, err)

ret, err = selectUnfinishedCatchpoints(context.Background(), dbs.Rdb.Handle)
ret, err = cts.SelectUnfinishedCatchpoints(context.Background())
require.NoError(t, err)
expected = []unfinishedCatchpointRecord{
expected = []store.UnfinishedCatchpointRecord{
{
round: 5,
blockHash: d5,
Round: 5,
BlockHash: d5,
},
}
require.Equal(t, expected, ret)
Expand Down
Loading